目次
ソケットからのデータ受信
SPDKは、POSIXのreadvでソケットからデータを読み取る。readv() システムコールは、ファイルディスクリプター fd に関連付けられた ファイルから、 iovcnt 個のバッファー分のデータを読み込み、 iov で指定 されたバッファーに格納する (“scatter input”;「ばらまき入力」)。ここでは2個分のデータを読みとる。
static inline ssize_t
posix_sock_read(struct spdk_posix_sock *sock)
{
struct iovec iov[2];
int bytes_avail, bytes_recvd;
struct spdk_posix_sock_group_impl *group;
bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
if (bytes_avail <= 0) {
return bytes_avail;
}
if (sock->ssl) {
bytes_recvd = SSL_readv(sock->ssl, iov, 2);
} else {
bytes_recvd = readv(sock->fd, iov, 2);★
}
assert(sock->pipe_has_data == false);
if (bytes_recvd <= 0) {
/* Errors count as draining the socket data */
if (sock->base.group_impl && sock->socket_has_data) {
group = __posix_group_impl(sock->base.group_impl);
TAILQ_REMOVE(&group->socks_with_data, sock, link);
}
sock->socket_has_data = false;
return bytes_recvd;
}
spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);
#if DEBUG
if (sock->base.group_impl) {
assert(sock->socket_has_data == true);
}
#endif
sock->pipe_has_data = true;
if (bytes_recvd < bytes_avail) {
/* We drained the kernel socket entirely. */
sock->socket_has_data = false;
}
return bytes_recvd;
}
ネットワークのソフトウェアスタックについては多くの公知情報がある。一つのサイトでうまく説明できていないため、その中でうまく説明できていると思われる2つを紹介する。
Packet queueing in the Linux network stack. | Download Scientific Diagram (researchgate.net)
NICはTCPパケットを受信した場合、NICドライバが確保したメモリ上のRX RingバッファにDMA転送する。上位アプリのSPKDはreadシステムコールで、RX Ringバッファを読み取る。
RX buffer
RX ring bufferは、NICが受信したパケットを保持するFIFO型の循環バッファである。Linuxカーネルはksoftirq->net_rx_actionハンドラにより、RX ring buffer(rx ring)に到着したパケットを処理する。このハンドラはドライバ固有のポーリング関数を呼び出す。(例: igb_poll)。
- NICは到着した受信パケットをDMAでメモリ上のrx ringに保存する(一般的にホストのCPUは関与しない)。メモリにコピーされた受信データはsk buffsでパケットごとに管理する。
- 受信データのメモリへのデータ転送後、NICはCPUにハードウェア割り込み送信。
- ハードウェア割込みを受信したCPUはその割り込みを解除し(トップハーフ)、ソフト割込み(NET_RX_SOFTIRQ)をksoftirqdに送信(ボトムハーフ)。これにより、NICからの別パケットのハード割込み処理と、CPU上のソフト割込を処理を並列処理できるようになる。その間にksoftirqdはバックグラウンドでRXバッファからデータを消費し始め、プロセス全体をブロックしない。
TCPlinux.pdf (sn0rt.github.io)
readv
ソケットのデータ受信はsockfsを介して、通常のファイルと同様のreadvにより行われる。readvの実体はsocket_file_opsに定義されたsock_read_iter で、内部はsock_read_iter -> sock_recvmsg -> tcp_recvmsgとなっている。
hiboma/sockfs.md at master · hiboma/hiboma · GitHub
static ssize_t sock_read_iter(struct kiocb *iocb, struct iov_iter *to)
{
struct file *file = iocb->ki_filp;
struct socket *sock = file->private_data;
struct msghdr msg = {.msg_iter = *to,
.msg_iocb = iocb};
ssize_t res;
if (file->f_flags & O_NONBLOCK || (iocb->ki_flags & IOCB_NOWAIT))
msg.msg_flags = MSG_DONTWAIT;
if (iocb->ki_pos != 0)
return -ESPIPE;
if (!iov_iter_count(to)) /* Match SYS5 behaviour */
return 0;
res = sock_recvmsg(sock, &msg, msg.msg_flags);★
*to = msg.msg_iter;
return res;
}
/**
* sock_recvmsg - receive a message from @sock
* @sock: socket
* @msg: message to receive
* @flags: message flags
*
* Receives @msg from @sock, passing through LSM. Returns the total number
* of bytes received, or an error.
*/
int sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags)
{
int err = security_socket_recvmsg(sock, msg, msg_data_left(msg), flags);
return err ?: sock_recvmsg_nosec(sock, msg, flags);
}
EXPORT_SYMBOL(sock_recvmsg);
static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
int flags)
{
int ret = INDIRECT_CALL_INET(sock->ops->recvmsg, inet6_recvmsg,
inet_recvmsg, sock, msg,
msg_data_left(msg), flags);
if (trace_sock_recv_length_enabled())
call_trace_sock_recv_length(sock->sk, ret, flags);
return ret;
}
sk_buff
tcp_recvmsg内でsock構造体から、ユーザ空間へのメモリコピーを行う。
Linuxのソケットはstruct socket構造体とstruct sock構造体が実体となる。個々の socket
データ構造体は、BSD ソケットに関する情報(状態、種別)とstruct sockへの参照を保持する。struct sockは、BSD ソケットに関するプロトコル固有の 情報を保持し、INET(Internet Address Domain)ソケット の場合、struct sockは、TCP/IP と UDP/IP 固有の情報を保持する。また、sk_bufferのキュー構造(sk_buffer_head)もstruct sockに紐づけられ、受信パケットをポイントしたsk_buffデータ構造体を格納するsk_receive_queueも、struct sockが保持する。
sk_buffはパケットデータを格納するバッファ。各パケットは、それぞれsk_buffに格納されLinuxのネットワークレイヤで扱われる。struct sk_buffはバッファの管理構造体である。実際にパケットデータを格納する領域が別にあり、sk_buffのhead,data,tail,endでデータの位置を管理する。
sk_buffは階層間でデータ受け渡しが行われる際のメモリコピーを避け、複数の処理でデータを共有する。NICからうけとったパケットはヘッダーを解析しながら、イーサ層、IP層、TCP層と渡っていく。sk_buff構造体は、各ヘッダにアクセスできるように各層のヘッダへのポインタを保持してる。
sk_buff – Linuxカーネルメモ (bit-hive.com)
tcp_recvmsg
アプリケーションからのデータ受信処理要求は、socketレイヤを経由しinet_recvmsg関数を呼び出す(struct proto_ops inet_stream_opsインターフェイステーブル経由)ことにより実現される。inet_recvmsg関数は、即tcp_recvmsg関数を呼び出す。(struct proto tcp_protインターフェイステーブル経由)。
tcp_recvmsg関数は、ソケットのreceive_queueにリンクされているパケットのデータをユーザ空間にコピーし、パケットを解放する。もしreceive_queueにデータが無い場合は、パケット到着を待つ。
internal22-282-受信処理アルゴリズム – Linux Kernel Documents Wiki – Linux Kernel Documents – OSDN
int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int flags,
int *addr_len)
{
int cmsg_flags = 0, ret;
struct scm_timestamping_internal tss;
if (unlikely(flags & MSG_ERRQUEUE))
return inet_recv_error(sk, msg, len, addr_len);
if (sk_can_busy_loop(sk) &&
skb_queue_empty_lockless(&sk->sk_receive_queue) &&
sk->sk_state == TCP_ESTABLISHED)
sk_busy_loop(sk, flags & MSG_DONTWAIT);
lock_sock(sk);
ret = tcp_recvmsg_locked(sk, msg, len, flags, &tss, &cmsg_flags);★recvmsg本体
release_sock(sk);
if ((cmsg_flags || msg->msg_get_inq) && ret >= 0) {
if (cmsg_flags & TCP_CMSG_TS)
tcp_recv_timestamp(msg, sk, &tss);
if (msg->msg_get_inq) {
msg->msg_inq = tcp_inq_hint(sk);
if (cmsg_flags & TCP_CMSG_INQ)
put_cmsg(msg, SOL_TCP, TCP_CM_INQ,
sizeof(msg->msg_inq), &msg->msg_inq);
}
}
return ret;
}
EXPORT_SYMBOL(tcp_recvmsg);
/*
* This routine copies from a sock struct into the user buffer.
*
* Technical note: in 2.3 we work on _locked_ socket, so that
* tricks with *seq access order and skb->users are not required.
* Probably, code can be easily improved even more.
*/
static int tcp_recvmsg_locked(struct sock *sk, struct msghdr *msg, size_t len,
int flags, struct scm_timestamping_internal *tss,
int *cmsg_flags)
{
struct tcp_sock *tp = tcp_sk(sk);
int copied = 0;
u32 peek_seq;
u32 *seq;
unsigned long used;
int err;
int target; /* Read at least this many bytes */
long timeo;
struct sk_buff *skb, *last;
u32 urg_hole = 0;
err = -ENOTCONN;
if (sk->sk_state == TCP_LISTEN)
goto out;
if (tp->recvmsg_inq) {
*cmsg_flags = TCP_CMSG_INQ;
msg->msg_get_inq = 1;
}
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
/* Urgent data needs to be handled specially. */
if (flags & MSG_OOB)
goto recv_urg;
if (unlikely(tp->repair)) {
err = -EPERM;
if (!(flags & MSG_PEEK))
goto out;
if (tp->repair_queue == TCP_SEND_QUEUE)
goto recv_sndq;
err = -EINVAL;
if (tp->repair_queue == TCP_NO_QUEUE)
goto out;
/* 'common' recv queue MSG_PEEK-ing */
}
seq = &tp->copied_seq;
if (flags & MSG_PEEK) {
peek_seq = tp->copied_seq;
seq = &peek_seq;
}
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
do {
u32 offset;
/* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
if (unlikely(tp->urg_data) && tp->urg_seq == *seq) {
if (copied)
break;
if (signal_pending(current)) {
copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
break;
}
}
/* Next get a buffer. */
last = skb_peek_tail(&sk->sk_receive_queue);★ソケットのreceive_queueの先頭から順にパケット内を参照
skb_queue_walk(&sk->sk_receive_queue, skb) {
last = skb;
/* Now that we have two receive queues this
* shouldn't happen.
*/
if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
"TCP recvmsg seq # bug: copied %X, seq %X, rcvnxt %X, fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
flags))
break;
offset = *seq - TCP_SKB_CB(skb)->seq;
if (unlikely(TCP_SKB_CB(skb)->tcp_flags & TCPHDR_SYN)) {
pr_err_once("%s: found a SYN, please report !\n", __func__);
offset--;
}
if (offset < skb->len)
goto found_ok_skb;
if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
goto found_fin_ok;
WARN(!(flags & MSG_PEEK),
"TCP recvmsg seq # bug 2: copied %X, seq %X, rcvnxt %X, fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
}
/* Well, if we have backlog, try to process it now yet. */
if (copied >= target && !READ_ONCE(sk->sk_backlog.tail))
break;
if (copied) {
if (!timeo ||
sk->sk_err ||
sk->sk_state == TCP_CLOSE ||
(sk->sk_shutdown & RCV_SHUTDOWN) ||
signal_pending(current))
break;
} else {
if (sock_flag(sk, SOCK_DONE))
break;
if (sk->sk_err) {
copied = sock_error(sk);
break;
}
if (sk->sk_shutdown & RCV_SHUTDOWN)
break;
if (sk->sk_state == TCP_CLOSE) {
/* This occurs when user tries to read
* from never connected socket.
*/
copied = -ENOTCONN;
break;
}
if (!timeo) {
copied = -EAGAIN;
break;
}
if (signal_pending(current)) {
copied = sock_intr_errno(timeo);
break;
}
}
if (copied >= target) {
/* Do not sleep, just process backlog. */
__sk_flush_backlog(sk);
} else {
tcp_cleanup_rbuf(sk, copied);
sk_wait_data(sk, &timeo, last);
}
if ((flags & MSG_PEEK) &&
(peek_seq - copied - urg_hole != tp->copied_seq)) {
net_dbg_ratelimited("TCP(%s:%d): Application bug, race in MSG_PEEK\n",
current->comm,
task_pid_nr(current));
peek_seq = tp->copied_seq;
}
continue;
found_ok_skb:
/* Ok so how much can we use? */
used = skb->len - offset;
if (len < used)
used = len;
/* Do we have urgent data here? */
if (unlikely(tp->urg_data)) {
u32 urg_offset = tp->urg_seq - *seq;
if (urg_offset < used) {
if (!urg_offset) {
if (!sock_flag(sk, SOCK_URGINLINE)) {
WRITE_ONCE(*seq, *seq + 1);
urg_hole++;
offset++;
used--;
if (!used)
goto skip_copy;
}
} else
used = urg_offset;
}
}
if (!(flags & MSG_TRUNC)) {
err = skb_copy_datagram_msg(skb, offset, msg, used);★パケット内のデータをユーザ空間にコピー
if (err) {
/* Exception. Bailout! */
if (!copied)
copied = -EFAULT;
break;
}
}
WRITE_ONCE(*seq, *seq + used);
copied += used;
len -= used;
tcp_rcv_space_adjust(sk);
skip_copy:
if (unlikely(tp->urg_data) && after(tp->copied_seq, tp->urg_seq)) {
WRITE_ONCE(tp->urg_data, 0);
tcp_fast_path_check(sk);
}
if (TCP_SKB_CB(skb)->has_rxtstamp) {
tcp_update_recv_tstamps(skb, tss);
*cmsg_flags |= TCP_CMSG_TS;
}
if (used + offset < skb->len)
continue;
if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
goto found_fin_ok;
if (!(flags & MSG_PEEK))
tcp_eat_recv_skb(sk, skb);
continue;
found_fin_ok:
/* Process the FIN. */
WRITE_ONCE(*seq, *seq + 1);
if (!(flags & MSG_PEEK))
tcp_eat_recv_skb(sk, skb);
break;
} while (len > 0);
/* According to UNIX98, msg_name/msg_namelen are ignored
* on connected socket. I was just happy when found this 8) --ANK
*/
/* Clean up data we have read: This will do ACK frames. */
tcp_cleanup_rbuf(sk, copied);
return copied;
out:
return err;
recv_urg:
err = tcp_recv_urg(sk, msg, len, flags);
goto out;
recv_sndq:
err = tcp_peek_sndq(sk, msg, len);
goto out;
}
/**
* struct socket - general BSD socket
* @state: socket state (%SS_CONNECTED, etc)
* @type: socket type (%SOCK_STREAM, etc)
* @flags: socket flags (%SOCK_NOSPACE, etc)
* @ops: protocol specific socket operations
* @file: File back pointer for gc
* @sk: internal networking protocol agnostic socket representation
* @wq: wait queue for several uses
*/
struct socket {
socket_state state;
short type;
unsigned long flags;
struct file *file;
struct sock *sk;
const struct proto_ops *ops;
struct socket_wq wq;
};
struct sock {
/*
* Now struct inet_timewait_sock also uses sock_common, so please just
* don't add nothing before this first member (__sk_common) --acme
*/
struct sock_common __sk_common;
#define sk_node __sk_common.skc_node
#define sk_nulls_node __sk_common.skc_nulls_node
#define sk_refcnt __sk_common.skc_refcnt
#define sk_tx_queue_mapping __sk_common.skc_tx_queue_mapping
#ifdef CONFIG_SOCK_RX_QUEUE_MAPPING
#define sk_rx_queue_mapping __sk_common.skc_rx_queue_mapping
#endif
#define sk_dontcopy_begin __sk_common.skc_dontcopy_begin
#define sk_dontcopy_end __sk_common.skc_dontcopy_end
#define sk_hash __sk_common.skc_hash
#define sk_portpair __sk_common.skc_portpair
#define sk_num __sk_common.skc_num
#define sk_dport __sk_common.skc_dport
#define sk_addrpair __sk_common.skc_addrpair
#define sk_daddr __sk_common.skc_daddr
#define sk_rcv_saddr __sk_common.skc_rcv_saddr
#define sk_family __sk_common.skc_family
#define sk_state __sk_common.skc_state
#define sk_reuse __sk_common.skc_reuse
#define sk_reuseport __sk_common.skc_reuseport
#define sk_ipv6only __sk_common.skc_ipv6only
#define sk_net_refcnt __sk_common.skc_net_refcnt
#define sk_bound_dev_if __sk_common.skc_bound_dev_if
#define sk_bind_node __sk_common.skc_bind_node
#define sk_prot __sk_common.skc_prot
#define sk_net __sk_common.skc_net
#define sk_v6_daddr __sk_common.skc_v6_daddr
#define sk_v6_rcv_saddr __sk_common.skc_v6_rcv_saddr
#define sk_cookie __sk_common.skc_cookie
#define sk_incoming_cpu __sk_common.skc_incoming_cpu
#define sk_flags __sk_common.skc_flags
#define sk_rxhash __sk_common.skc_rxhash
/* early demux fields */
struct dst_entry __rcu *sk_rx_dst;
int sk_rx_dst_ifindex;
u32 sk_rx_dst_cookie;
socket_lock_t sk_lock;
atomic_t sk_drops;
int sk_rcvlowat;
struct sk_buff_head sk_error_queue;
struct sk_buff_head sk_receive_queue;
/*
* The backlog queue is special, it is always used with
* the per-socket spinlock held and requires low latency
* access. Therefore we special case it's implementation.
* Note : rmem_alloc is in this structure to fill a hole
* on 64bit arches, not because its logically part of
* backlog.
*/
struct {
atomic_t rmem_alloc;
int len;
struct sk_buff *head;
struct sk_buff *tail;
} sk_backlog;
#define sk_rmem_alloc sk_backlog.rmem_alloc
int sk_forward_alloc;
u32 sk_reserved_mem;
#ifdef CONFIG_NET_RX_BUSY_POLL
unsigned int sk_ll_usec;
/* ===== mostly read cache line ===== */
unsigned int sk_napi_id;
#endif
int sk_rcvbuf;
struct sk_filter __rcu *sk_filter;
union {
struct socket_wq __rcu *sk_wq;
/* private: */
struct socket_wq *sk_wq_raw;
/* public: */
};
#ifdef CONFIG_XFRM
struct xfrm_policy __rcu *sk_policy[2];
#endif
struct dst_entry __rcu *sk_dst_cache;
atomic_t sk_omem_alloc;
int sk_sndbuf;
/* ===== cache line for TX ===== */
int sk_wmem_queued;
refcount_t sk_wmem_alloc;
unsigned long sk_tsq_flags;
union {
struct sk_buff *sk_send_head;
struct rb_root tcp_rtx_queue;
};
struct sk_buff_head sk_write_queue;
__s32 sk_peek_off;
int sk_write_pending;
__u32 sk_dst_pending_confirm;
u32 sk_pacing_status; /* see enum sk_pacing */
long sk_sndtimeo;
struct timer_list sk_timer;
__u32 sk_priority;
__u32 sk_mark;
unsigned long sk_pacing_rate; /* bytes per second */
unsigned long sk_max_pacing_rate;
struct page_frag sk_frag;
netdev_features_t sk_route_caps;
int sk_gso_type;
unsigned int sk_gso_max_size;
gfp_t sk_allocation;
__u32 sk_txhash;
/*
* Because of non atomicity rules, all
* changes are protected by socket lock.
*/
u8 sk_gso_disabled : 1,
sk_kern_sock : 1,
sk_no_check_tx : 1,
sk_no_check_rx : 1,
sk_userlocks : 4;
u8 sk_pacing_shift;
u16 sk_type;
u16 sk_protocol;
u16 sk_gso_max_segs;
unsigned long sk_lingertime;
struct proto *sk_prot_creator;
rwlock_t sk_callback_lock;
int sk_err,
sk_err_soft;
u32 sk_ack_backlog;
u32 sk_max_ack_backlog;
kuid_t sk_uid;
u8 sk_txrehash;
#ifdef CONFIG_NET_RX_BUSY_POLL
u8 sk_prefer_busy_poll;
u16 sk_busy_poll_budget;
#endif
spinlock_t sk_peer_lock;
int sk_bind_phc;
struct pid *sk_peer_pid;
const struct cred *sk_peer_cred;
long sk_rcvtimeo;
ktime_t sk_stamp;
#if BITS_PER_LONG==32
seqlock_t sk_stamp_seq;
#endif
atomic_t sk_tskey;
atomic_t sk_zckey;
u32 sk_tsflags;
u8 sk_shutdown;
u8 sk_clockid;
u8 sk_txtime_deadline_mode : 1,
sk_txtime_report_errors : 1,
sk_txtime_unused : 6;
bool sk_use_task_frag;
struct socket *sk_socket;
void *sk_user_data;
#ifdef CONFIG_SECURITY
void *sk_security;
#endif
struct sock_cgroup_data sk_cgrp_data;
struct mem_cgroup *sk_memcg;
void (*sk_state_change)(struct sock *sk);
void (*sk_data_ready)(struct sock *sk);
void (*sk_write_space)(struct sock *sk);
void (*sk_error_report)(struct sock *sk);
int (*sk_backlog_rcv)(struct sock *sk,
struct sk_buff *skb);
#ifdef CONFIG_SOCK_VALIDATE_XMIT
struct sk_buff* (*sk_validate_xmit_skb)(struct sock *sk,
struct net_device *dev,
struct sk_buff *skb);
#endif
void (*sk_destruct)(struct sock *sk);
struct sock_reuseport __rcu *sk_reuseport_cb;
#ifdef CONFIG_BPF_SYSCALL
struct bpf_local_storage __rcu *sk_bpf_storage;
#endif
struct rcu_head sk_rcu;
netns_tracker ns_tracker;
struct hlist_node sk_bind2_node;
};
/**
* struct sock_common - minimal network layer representation of sockets
* @skc_daddr: Foreign IPv4 addr
* @skc_rcv_saddr: Bound local IPv4 addr
* @skc_addrpair: 8-byte-aligned __u64 union of @skc_daddr & @skc_rcv_saddr
* @skc_hash: hash value used with various protocol lookup tables
* @skc_u16hashes: two u16 hash values used by UDP lookup tables
* @skc_dport: placeholder for inet_dport/tw_dport
* @skc_num: placeholder for inet_num/tw_num
* @skc_portpair: __u32 union of @skc_dport & @skc_num
* @skc_family: network address family
* @skc_state: Connection state
* @skc_reuse: %SO_REUSEADDR setting
* @skc_reuseport: %SO_REUSEPORT setting
* @skc_ipv6only: socket is IPV6 only
* @skc_net_refcnt: socket is using net ref counting
* @skc_bound_dev_if: bound device index if != 0
* @skc_bind_node: bind hash linkage for various protocol lookup tables
* @skc_portaddr_node: second hash linkage for UDP/UDP-Lite protocol
* @skc_prot: protocol handlers inside a network family
* @skc_net: reference to the network namespace of this socket
* @skc_v6_daddr: IPV6 destination address
* @skc_v6_rcv_saddr: IPV6 source address
* @skc_cookie: socket's cookie value
* @skc_node: main hash linkage for various protocol lookup tables
* @skc_nulls_node: main hash linkage for TCP/UDP/UDP-Lite protocol
* @skc_tx_queue_mapping: tx queue number for this connection
* @skc_rx_queue_mapping: rx queue number for this connection
* @skc_flags: place holder for sk_flags
* %SO_LINGER (l_onoff), %SO_BROADCAST, %SO_KEEPALIVE,
* %SO_OOBINLINE settings, %SO_TIMESTAMPING settings
* @skc_listener: connection request listener socket (aka rsk_listener)
* [union with @skc_flags]
* @skc_tw_dr: (aka tw_dr) ptr to &struct inet_timewait_death_row
* [union with @skc_flags]
* @skc_incoming_cpu: record/match cpu processing incoming packets
* @skc_rcv_wnd: (aka rsk_rcv_wnd) TCP receive window size (possibly scaled)
* [union with @skc_incoming_cpu]
* @skc_tw_rcv_nxt: (aka tw_rcv_nxt) TCP window next expected seq number
* [union with @skc_incoming_cpu]
* @skc_refcnt: reference count
*
* This is the minimal network layer representation of sockets, the header
* for struct sock and struct inet_timewait_sock.
*/
struct sock_common {
union {
__addrpair skc_addrpair;
struct {
__be32 skc_daddr;
__be32 skc_rcv_saddr;
};
};
union {
unsigned int skc_hash;
__u16 skc_u16hashes[2];
};
/* skc_dport && skc_num must be grouped as well */
union {
__portpair skc_portpair;
struct {
__be16 skc_dport;
__u16 skc_num;
};
};
unsigned short skc_family;
volatile unsigned char skc_state;
unsigned char skc_reuse:4;
unsigned char skc_reuseport:1;
unsigned char skc_ipv6only:1;
unsigned char skc_net_refcnt:1;
int skc_bound_dev_if;
union {
struct hlist_node skc_bind_node;
struct hlist_node skc_portaddr_node;
};
struct proto *skc_prot;
possible_net_t skc_net;
#if IS_ENABLED(CONFIG_IPV6)
struct in6_addr skc_v6_daddr;
struct in6_addr skc_v6_rcv_saddr;
#endif
atomic64_t skc_cookie;
/* following fields are padding to force
* offset(struct sock, sk_refcnt) == 128 on 64bit arches
* assuming IPV6 is enabled. We use this padding differently
* for different kind of 'sockets'
*/
union {
unsigned long skc_flags;
struct sock *skc_listener; /* request_sock */
struct inet_timewait_death_row *skc_tw_dr; /* inet_timewait_sock */
};
/*
* fields between dontcopy_begin/dontcopy_end
* are not copied in sock_copy()
*/
/* private: */
int skc_dontcopy_begin[0];
/* public: */
union {
struct hlist_node skc_node;
struct hlist_nulls_node skc_nulls_node;
};
unsigned short skc_tx_queue_mapping;
#ifdef CONFIG_SOCK_RX_QUEUE_MAPPING
unsigned short skc_rx_queue_mapping;
#endif
union {
int skc_incoming_cpu;
u32 skc_rcv_wnd;
u32 skc_tw_rcv_nxt; /* struct tcp_timewait_sock */
};
refcount_t skc_refcnt;
/* private: */
int skc_dontcopy_end[0];
union {
u32 skc_rxhash;
u32 skc_window_clamp;
u32 skc_tw_snd_nxt; /* struct tcp_timewait_sock */
};
/* public: */
};
SPDKの動作
バックトレース
Thread 1 "reactor_0" hit Breakpoint 2, __GI___readv (fd=73, iov=0x7fffffffda70, iovcnt=2) at ../sysdeps/unix/sysv/linux/readv.c:25
25 ../sysdeps/unix/sysv/linux/readv.c: No such file or directory.
(gdb) bt
#0 __GI___readv (fd=73, iov=0x7fffffffda70, iovcnt=2) at ../sysdeps/unix/sysv/linux/readv.c:25
#1 0x0000555555626d84 in posix_sock_read (sock=0x555555ed5090) at posix.c:1401
#2 0x00005555556270d6 in posix_sock_readv (_sock=0x555555ed5090, iov=0x7fffffffdb10, iovcnt=1) at posix.c:1476
#3 0x000055555562713e in posix_sock_recv (sock=0x555555ed5090, buf=0x2000366f43c8, len=8) at posix.c:1493
#4 0x00005555556e58b0 in spdk_sock_recv (sock=0x555555ed5090, buf=0x2000366f43c8, len=8) at sock.c:458
#5 0x000055555564e294 in nvme_tcp_read_data (sock=0x555555ed5090, bytes=8, buf=0x2000366f43c8)
at /home/takayuki/repos/spdk/include/spdk_internal/nvme_tcp.h:382
#6 0x0000555555654074 in nvmf_tcp_sock_process (tqpair=0x555555ed5e70) at tcp.c:2191
#7 0x0000555555656bd3 in nvmf_tcp_sock_cb (arg=0x555555ed5e70, group=0x555555cbbb60, sock=0x555555ed5090) at tcp.c:3056
#8 0x00005555556e60d2 in sock_group_impl_poll_count (group_impl=0x555555ec22a0, group=0x555555cbbb60, max_events=32) at sock.c:691
#9 0x00005555556e616c in spdk_sock_group_poll_count (group=0x555555cbbb60, max_events=32) at sock.c:717
#10 0x00005555556e5fcb in spdk_sock_group_poll (group=0x555555cbbb60) at sock.c:668
#11 0x0000555555657200 in nvmf_tcp_poll_group_poll (group=0x555555d3acf0) at tcp.c:3209
#12 0x000055555564cd40 in nvmf_transport_poll_group_poll (group=0x555555d3acf0) at transport.c:666
#13 0x0000555555641a6d in nvmf_poll_group_poll (ctx=0x555555d21550) at nvmf.c:71
#14 0x00005555556f00ba in thread_execute_poller (thread=0x555555d21160, poller=0x555555d21600) at thread.c:946
#15 0x00005555556f0640 in thread_poll (thread=0x555555d21160, max_msgs=0, now=4449678885613) at thread.c:1072
#16 0x00005555556f08ef in spdk_thread_poll (thread=0x555555d21160, max_msgs=0, now=4449678885613) at thread.c:1156
#17 0x00005555556b2542 in _reactor_run (reactor=0x555555cfb1c0) at reactor.c:903
#18 0x00005555556b2634 in reactor_run (arg=0x555555cfb1c0) at reactor.c:941
#19 0x00005555556b2a8f in spdk_reactors_start () at reactor.c:1053
#20 0x00005555556aee7a in spdk_app_start (opts_user=0x7fffffffe340, start_fn=0x555555570d7a <nvmf_tgt_started>, arg1=0x0) at app.c:771
#21 0x0000555555570ed4 in main (argc=3, argv=0x7fffffffe518) at nvmf_main.c:47
nvmf_tcp_sock_process
SPDKではnvmf_tcp_sock_processでPDUデータを処理する。
static int
nvmf_tcp_sock_process(struct spdk_nvmf_tcp_qpair *tqpair)
{
int rc = 0;
struct nvme_tcp_pdu *pdu;
enum nvme_tcp_pdu_recv_state prev_state;
uint32_t data_len;
struct spdk_nvmf_tcp_transport *ttransport = SPDK_CONTAINEROF(tqpair->qpair.transport,
struct spdk_nvmf_tcp_transport, transport);
/* The loop here is to allow for several back-to-back state changes. */
do {
prev_state = tqpair->recv_state;
SPDK_DEBUGLOG(nvmf_tcp, "tqpair(%p) recv pdu entering state %d\n", tqpair, prev_state);
pdu = tqpair->pdu_in_progress;
assert(pdu || tqpair->recv_state == NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_READY);
switch (tqpair->recv_state) {
/* Wait for the common header */
case NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_READY:
if (!pdu) {
pdu = SLIST_FIRST(&tqpair->tcp_pdu_free_queue);
if (spdk_unlikely(!pdu)) {
return NVME_TCP_PDU_IN_PROGRESS;
}
SLIST_REMOVE_HEAD(&tqpair->tcp_pdu_free_queue, slist);
tqpair->pdu_in_progress = pdu;
}
memset(pdu, 0, offsetof(struct nvme_tcp_pdu, qpair));
nvmf_tcp_qpair_set_recv_state(tqpair, NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_CH);
/* FALLTHROUGH */
case NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_CH:
if (spdk_unlikely(tqpair->state == NVME_TCP_QPAIR_STATE_INITIALIZING)) {
return rc;
}
rc = nvme_tcp_read_data(tqpair->sock,
sizeof(struct spdk_nvme_tcp_common_pdu_hdr) - pdu->ch_valid_bytes,
(void *)&pdu->hdr.common + pdu->ch_valid_bytes);★CH(共通ヘッダ)の未受信データをpdu->hdr.commonにコピーする。
if (rc < 0) {
SPDK_DEBUGLOG(nvmf_tcp, "will disconnect tqpair=%p\n", tqpair);
return NVME_TCP_PDU_FATAL;
} else if (rc > 0) {
pdu->ch_valid_bytes += rc;
spdk_trace_record(TRACE_TCP_READ_FROM_SOCKET_DONE, tqpair->qpair.qid, rc, 0, tqpair);
}
if (pdu->ch_valid_bytes < sizeof(struct spdk_nvme_tcp_common_pdu_hdr)) {
return NVME_TCP_PDU_IN_PROGRESS;
}
/* The command header of this PDU has now been read from the socket. */
nvmf_tcp_pdu_ch_handle(tqpair);
break;
/* Wait for the pdu specific header */
case NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_PSH:
rc = nvme_tcp_read_data(tqpair->sock,
pdu->psh_len - pdu->psh_valid_bytes,
(void *)&pdu->hdr.raw + sizeof(struct spdk_nvme_tcp_common_pdu_hdr) + pdu->psh_valid_bytes);
if (rc < 0) {
return NVME_TCP_PDU_FATAL;
} else if (rc > 0) {
spdk_trace_record(TRACE_TCP_READ_FROM_SOCKET_DONE, tqpair->qpair.qid, rc, 0, tqpair);
pdu->psh_valid_bytes += rc;
}
if (pdu->psh_valid_bytes < pdu->psh_len) {
return NVME_TCP_PDU_IN_PROGRESS;
}
/* All header(ch, psh, head digist) of this PDU has now been read from the socket. */
nvmf_tcp_pdu_psh_handle(tqpair, ttransport);
break;
/* Wait for the req slot */
case NVME_TCP_PDU_RECV_STATE_AWAIT_REQ:
nvmf_tcp_capsule_cmd_hdr_handle(ttransport, tqpair, pdu);
break;
case NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_PAYLOAD:
/* check whether the data is valid, if not we just return */
if (!pdu->data_len) {
return NVME_TCP_PDU_IN_PROGRESS;
}
data_len = pdu->data_len;
/* data digest */
if (spdk_unlikely((pdu->hdr.common.pdu_type != SPDK_NVME_TCP_PDU_TYPE_H2C_TERM_REQ) &&
tqpair->host_ddgst_enable)) {
data_len += SPDK_NVME_TCP_DIGEST_LEN;
pdu->ddgst_enable = true;
}
rc = nvme_tcp_read_payload_data(tqpair->sock, pdu);★payload受信。
if (rc < 0) {
return NVME_TCP_PDU_FATAL;
}
pdu->rw_offset += rc;
if (pdu->rw_offset < data_len) {
return NVME_TCP_PDU_IN_PROGRESS;
}
/* Generate and insert DIF to whole data block received if DIF is enabled */
if (spdk_unlikely(pdu->dif_ctx != NULL) &&
spdk_dif_generate_stream(pdu->data_iov, pdu->data_iovcnt, 0, data_len,
pdu->dif_ctx) != 0) {
SPDK_ERRLOG("DIF generate failed\n");
return NVME_TCP_PDU_FATAL;
}
/* All of this PDU has now been read from the socket. */
nvmf_tcp_pdu_payload_handle(tqpair, pdu);
break;
case NVME_TCP_PDU_RECV_STATE_ERROR:
if (!spdk_sock_is_connected(tqpair->sock)) {
return NVME_TCP_PDU_FATAL;
}
break;
default:
SPDK_ERRLOG("The state(%d) is invalid\n", tqpair->recv_state);
abort();
break;
}
} while (tqpair->recv_state != prev_state);
return rc;
}
nvme_tcp_read_payload_data
nvme_tcp_read_payload_dataは、受信PDU中のSGL構造体を読み取りIOVを作成する。
iovの実体はnvme_tcp_pdu_set_data_bufで設定された、tcp_reqのバッファ領域。
static int
nvme_tcp_read_payload_data(struct spdk_sock *sock, struct nvme_tcp_pdu *pdu)
{
struct iovec iov[NVME_TCP_MAX_SGL_DESCRIPTORS + 1]; #NVME_TCP_MAX_SGL_DESCRIPTORSは16
int iovcnt;
iovcnt = nvme_tcp_build_payload_iovs(iov, NVME_TCP_MAX_SGL_DESCRIPTORS + 1, pdu,
pdu->ddgst_enable, NULL);★payload用のiov作成。
assert(iovcnt >= 0);
return nvme_tcp_readv_data(sock, iov, iovcnt);★ソケットからデータをiovにコピー。メモリコピーもする。
}
static int
nvme_tcp_build_payload_iovs(struct iovec *iov, int iovcnt, struct nvme_tcp_pdu *pdu,
bool ddgst_enable, uint32_t *_mapped_length)
{
struct spdk_iov_sgl sgl;
if (iovcnt == 0) {
return 0;
}
spdk_iov_sgl_init(&sgl, iov, iovcnt, pdu->rw_offset);
if (spdk_likely(!pdu->dif_ctx)) {
if (!_nvme_tcp_sgl_append_multi(&sgl, pdu->data_iov, pdu->data_iovcnt)) ★{PDUデータをSGLに加える。
goto end;
}
} else {
if (!_nvme_tcp_sgl_append_multi_with_md(&sgl, pdu->data_iov, pdu->data_iovcnt,
pdu->data_len, pdu->dif_ctx)) {
goto end;
}
}
/* Data Digest */
if (ddgst_enable) {
spdk_iov_sgl_append(&sgl, pdu->data_digest, SPDK_NVME_TCP_DIGEST_LEN);
}
end:
if (_mapped_length != NULL) {
*_mapped_length = sgl.total_size;
}
return iovcnt - sgl.iovcnt;
}
static inline bool
_nvme_tcp_sgl_append_multi(struct spdk_iov_sgl *s, struct iovec *iov, int iovcnt)
{
int i;
for (i = 0; i < iovcnt; i++) {★PDU中のSGL構造体を読み取り、IOVの中身をspdk_iov_sglから参照させる。。
if (!spdk_iov_sgl_append(s, iov[i].iov_base, iov[i].iov_len)) {
return false;
}
}
return true;
}
/**
* Append the data to the struct spdk_iov_sgl pointed by s
*
* \param s the address of the struct spdk_iov_sgl
* \param data the data buffer to be appended
* \param data_len the length of the data.
*
* \return true if all the data is appended.
*/
static inline bool
spdk_iov_sgl_append(struct spdk_iov_sgl *s, uint8_t *data, uint32_t data_len)
{
if (s->iov_offset >= data_len) {
s->iov_offset -= data_len;
} else {
assert(s->iovcnt > 0);
s->iov->iov_base = data + s->iov_offset;
s->iov->iov_len = data_len - s->iov_offset;
s->total_size += data_len - s->iov_offset;
s->iov_offset = 0;
s->iov++;
s->iovcnt--;
if (s->iovcnt == 0) {
return false;
}
}
return true;
}
nvme_tcp_readv_data
nvme_tcp_readv_dataはiovが1つの場合recv()を複数の場合readv()でソケットからデータ受信する。この際、カーネル空間の受信バッファからiovにコピーする。
static int
nvme_tcp_readv_data(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
{
int ret;
assert(sock != NULL);
if (iov == NULL || iovcnt == 0) {
return 0;
}
if (iovcnt == 1) {
return nvme_tcp_read_data(sock, iov->iov_len, iov->iov_base);
}
ret = spdk_sock_readv(sock, iov, iovcnt);
if (ret > 0) {
return ret;
}
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
/* For connect reset issue, do not output error log */
if (errno != ECONNRESET) {
SPDK_ERRLOG("spdk_sock_readv() failed, errno %d: %s\n",
errno, spdk_strerror(errno));
}
}
/* connection closed */
return NVME_TCP_CONNECTION_FATAL;
}
struct nvme_tcp_pdu
SPDKが管理するバッファはstruct nvme_tcp_pduのdata_iovで管理する。
struct nvme_tcp_pdu {
union {
/* to hold error pdu data */
uint8_t raw[SPDK_NVME_TCP_TERM_REQ_PDU_MAX_SIZE];
struct spdk_nvme_tcp_common_pdu_hdr common;
struct spdk_nvme_tcp_ic_req ic_req;
struct spdk_nvme_tcp_term_req_hdr term_req;
struct spdk_nvme_tcp_cmd capsule_cmd;
struct spdk_nvme_tcp_h2c_data_hdr h2c_data;
struct spdk_nvme_tcp_ic_resp ic_resp;
struct spdk_nvme_tcp_rsp capsule_resp;
struct spdk_nvme_tcp_c2h_data_hdr c2h_data;
struct spdk_nvme_tcp_r2t_hdr r2t;
} hdr;
bool has_hdgst;
bool ddgst_enable;
uint32_t data_digest_crc32;
uint8_t data_digest[SPDK_NVME_TCP_DIGEST_LEN];
uint8_t ch_valid_bytes;
uint8_t psh_valid_bytes;
uint8_t psh_len;
nvme_tcp_qpair_xfer_complete_cb cb_fn;
void *cb_arg;
/* The sock request ends with a 0 length iovec. Place the actual iovec immediately
* after it. There is a static assert below to check if the compiler inserted
* any unwanted padding */
struct spdk_sock_request sock_req;
struct iovec iov[NVME_TCP_MAX_SGL_DESCRIPTORS * 2];
struct iovec data_iov[NVME_TCP_MAX_SGL_DESCRIPTORS];
uint32_t data_iovcnt;
uint32_t data_len;
uint32_t rw_offset;
TAILQ_ENTRY(nvme_tcp_pdu) tailq;
uint32_t remaining;
uint32_t padding_len;
struct spdk_dif_ctx *dif_ctx;
void *req; /* data tied to a tcp request */
void *qpair;
SLIST_ENTRY(nvme_tcp_pdu) slist;
};
SPDK_STATIC_ASSERT(offsetof(struct nvme_tcp_pdu,
sock_req) + sizeof(struct spdk_sock_request) == offsetof(struct nvme_tcp_pdu, iov),
"Compiler inserted padding between iov and sock_req");
nvme_tcp_pdu_set_data_buf
struct nvme_tcp_pduのdata_iovはnvme_tcp_pdu_set_data_bufで設定する。nvmf_tcp_h2c_data_hdr_handleは、IOキュー初期化時に確保したtcp_req->bufをdata_iovに設定する。tcp_req->bufの実体はspdk_zmallocで確保したDPDKのHugepage領域である(DMA領域
)。
static void
nvmf_tcp_h2c_data_hdr_handle(struct spdk_nvmf_tcp_transport *ttransport,
struct spdk_nvmf_tcp_qpair *tqpair,
struct nvme_tcp_pdu *pdu)
{
struct spdk_nvmf_tcp_req *tcp_req;
uint32_t error_offset = 0;
enum spdk_nvme_tcp_term_req_fes fes = 0;
struct spdk_nvme_tcp_h2c_data_hdr *h2c_data;
h2c_data = &pdu->hdr.h2c_data;
SPDK_DEBUGLOG(nvmf_tcp, "tqpair=%p, r2t_info: datao=%u, datal=%u, cccid=%u, ttag=%u\n",
tqpair, h2c_data->datao, h2c_data->datal, h2c_data->cccid, h2c_data->ttag);
if (h2c_data->ttag > tqpair->resource_count) {
SPDK_DEBUGLOG(nvmf_tcp, "ttag %u is larger than allowed %u.\n", h2c_data->ttag,
tqpair->resource_count);
fes = SPDK_NVME_TCP_TERM_REQ_FES_PDU_SEQUENCE_ERROR;
error_offset = offsetof(struct spdk_nvme_tcp_h2c_data_hdr, ttag);
goto err;
}
tcp_req = &tqpair->reqs[h2c_data->ttag - 1];
if (spdk_unlikely(tcp_req->state != TCP_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER &&
tcp_req->state != TCP_REQUEST_STATE_AWAITING_R2T_ACK)) {
SPDK_DEBUGLOG(nvmf_tcp, "tcp_req(%p), tqpair=%p, has error state in %d\n", tcp_req, tqpair,
tcp_req->state);
fes = SPDK_NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD;
error_offset = offsetof(struct spdk_nvme_tcp_h2c_data_hdr, ttag);
goto err;
}
if (spdk_unlikely(tcp_req->req.cmd->nvme_cmd.cid != h2c_data->cccid)) {
SPDK_DEBUGLOG(nvmf_tcp, "tcp_req(%p), tqpair=%p, expected %u but %u for cccid.\n", tcp_req, tqpair,
tcp_req->req.cmd->nvme_cmd.cid, h2c_data->cccid);
fes = SPDK_NVME_TCP_TERM_REQ_FES_PDU_SEQUENCE_ERROR;
error_offset = offsetof(struct spdk_nvme_tcp_h2c_data_hdr, cccid);
goto err;
}
if (tcp_req->h2c_offset != h2c_data->datao) {
SPDK_DEBUGLOG(nvmf_tcp,
"tcp_req(%p), tqpair=%p, expected data offset %u, but data offset is %u\n",
tcp_req, tqpair, tcp_req->h2c_offset, h2c_data->datao);
fes = SPDK_NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_OUT_OF_RANGE;
goto err;
}
if ((h2c_data->datao + h2c_data->datal) > tcp_req->req.length) {
SPDK_DEBUGLOG(nvmf_tcp,
"tcp_req(%p), tqpair=%p, (datao=%u + datal=%u) exceeds requested length=%u\n",
tcp_req, tqpair, h2c_data->datao, h2c_data->datal, tcp_req->req.length);
fes = SPDK_NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_OUT_OF_RANGE;
goto err;
}
pdu->req = tcp_req;
if (spdk_unlikely(tcp_req->req.dif_enabled)) {
pdu->dif_ctx = &tcp_req->req.dif.dif_ctx;
}
nvme_tcp_pdu_set_data_buf(pdu, tcp_req->req.iov, tcp_req->req.iovcnt,
h2c_data->datao, h2c_data->datal);
nvmf_tcp_qpair_set_recv_state(tqpair, NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_PAYLOAD);
return;
err:
nvmf_tcp_send_c2h_term_req(tqpair, pdu, fes, error_offset);
}
nvmf_tcp_req_parse_sgl
SPDKはデータ受信が必要なコマンド処理では、TCP_REQUEST_STATE_NEED_BUFFER状態に入り、nvmf_tcp_req_parse_sglを呼び出す。nvmf_tcp_req_parse_sglは非In-capsuleデータの場合、PDU受信時にspdk_nvmf_request_get_buffersを呼出しプールからデータ領域の確保を、In-capsuleデータの場合IOキュー接続時に確保したtcp_req->bufをstruct nvme_tcp_pduのdata_iovに設定する。
static int
nvmf_tcp_req_parse_sgl(struct spdk_nvmf_tcp_req *tcp_req,
struct spdk_nvmf_transport *transport,
struct spdk_nvmf_transport_poll_group *group)
{
struct spdk_nvmf_request *req = &tcp_req->req;
struct spdk_nvme_cmd *cmd;
struct spdk_nvme_sgl_descriptor *sgl;
struct spdk_nvmf_tcp_poll_group *tgroup;
enum spdk_nvme_tcp_term_req_fes fes;
struct nvme_tcp_pdu *pdu;
struct spdk_nvmf_tcp_qpair *tqpair;
uint32_t length, error_offset = 0;
cmd = &req->cmd->nvme_cmd;
sgl = &cmd->dptr.sgl1;
if (sgl->generic.type == SPDK_NVME_SGL_TYPE_TRANSPORT_DATA_BLOCK &&
sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_TRANSPORT) {★In-capsuleデータ。
/* get request length from sgl */
length = sgl->unkeyed.length;
if (spdk_unlikely(length > transport->opts.max_io_size)) {
SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
length, transport->opts.max_io_size);
fes = SPDK_NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_LIMIT_EXCEEDED;
goto fatal_err;
}
/* fill request length and populate iovs */
req->length = length;
SPDK_DEBUGLOG(nvmf_tcp, "Data requested length= 0x%x\n", length);
if (spdk_unlikely(req->dif_enabled)) {
req->dif.orig_length = length;
length = spdk_dif_get_length_with_md(length, &req->dif.dif_ctx);
req->dif.elba_length = length;
}
if (nvmf_ctrlr_use_zcopy(req)) {
SPDK_DEBUGLOG(nvmf_tcp, "Using zero-copy to execute request %p\n", tcp_req);
req->data_from_pool = false;
return 0;
}
if (spdk_nvmf_request_get_buffers(req, group, transport, length)) {
/* No available buffers. Queue this request up. */
SPDK_DEBUGLOG(nvmf_tcp, "No available large data buffers. Queueing request %p\n",
tcp_req);
return 0;
}
/* backward compatible */
req->data = req->iov[0].iov_base;
SPDK_DEBUGLOG(nvmf_tcp, "Request %p took %d buffer/s from central pool, and data=%p\n",
tcp_req, req->iovcnt, req->iov[0].iov_base);
return 0;
} else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
uint64_t offset = sgl->address;
uint32_t max_len = transport->opts.in_capsule_data_size;
assert(tcp_req->has_in_capsule_data);
/* Capsule Cmd with In-capsule Data should get data length from pdu header */
tqpair = tcp_req->pdu->qpair;
/* receiving pdu is not same with the pdu in tcp_req */
pdu = tqpair->pdu_in_progress;
length = pdu->hdr.common.plen - pdu->psh_len - sizeof(struct spdk_nvme_tcp_common_pdu_hdr);
if (tqpair->host_ddgst_enable) {
length -= SPDK_NVME_TCP_DIGEST_LEN;
}
/* This error is not defined in NVMe/TCP spec, take this error as fatal error */
if (spdk_unlikely(length != sgl->unkeyed.length)) {
SPDK_ERRLOG("In-Capsule Data length 0x%x is not equal to SGL data length 0x%x\n",
length, sgl->unkeyed.length);
fes = SPDK_NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD;
error_offset = offsetof(struct spdk_nvme_tcp_common_pdu_hdr, plen);
goto fatal_err;
}
SPDK_DEBUGLOG(nvmf_tcp, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
offset, length);
/* The NVMe/TCP transport does not use ICDOFF to control the in-capsule data offset. ICDOFF should be '0' */
if (spdk_unlikely(offset != 0)) {
/* Not defined fatal error in NVMe/TCP spec, handle this error as a fatal error */
SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " should be ZERO in NVMe/TCP\n", offset);
fes = SPDK_NVME_TCP_TERM_REQ_FES_INVALID_DATA_UNSUPPORTED_PARAMETER;
error_offset = offsetof(struct spdk_nvme_tcp_cmd, ccsqe.dptr.sgl1.address);
goto fatal_err;
}
if (spdk_unlikely(length > max_len)) {
/* According to the SPEC we should support ICD up to 8192 bytes for admin and fabric commands */
if (length <= SPDK_NVME_TCP_IN_CAPSULE_DATA_MAX_SIZE &&
(cmd->opc == SPDK_NVME_OPC_FABRIC || req->qpair->qid == 0)) {
/* Get a buffer from dedicated list */
SPDK_DEBUGLOG(nvmf_tcp, "Getting a buffer from control msg list\n");
tgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_tcp_poll_group, group);
assert(tgroup->control_msg_list);
req->iov[0].iov_base = nvmf_tcp_control_msg_get(tgroup->control_msg_list);
if (!req->iov[0].iov_base) {
/* No available buffers. Queue this request up. */
SPDK_DEBUGLOG(nvmf_tcp, "No available ICD buffers. Queueing request %p\n", tcp_req);
return 0;
}
} else {
SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
length, max_len);
fes = SPDK_NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_LIMIT_EXCEEDED;
goto fatal_err;
}
} else {
req->iov[0].iov_base = tcp_req->buf;
}
req->length = length;
req->data_from_pool = false;
req->data = req->iov[0].iov_base;
if (spdk_unlikely(req->dif_enabled)) {
length = spdk_dif_get_length_with_md(length, &req->dif.dif_ctx);
req->dif.elba_length = length;
}
req->iov[0].iov_len = length;
req->iovcnt = 1;
return 0;
}
/* If we want to handle the problem here, then we can't skip the following data segment.
* Because this function runs before reading data part, now handle all errors as fatal errors. */
SPDK_ERRLOG("Invalid NVMf I/O Command SGL: Type 0x%x, Subtype 0x%x\n",
sgl->generic.type, sgl->generic.subtype);
fes = SPDK_NVME_TCP_TERM_REQ_FES_INVALID_DATA_UNSUPPORTED_PARAMETER;
error_offset = offsetof(struct spdk_nvme_tcp_cmd, ccsqe.dptr.sgl1.generic);
fatal_err:
nvmf_tcp_send_c2h_term_req(tcp_req->pdu->qpair, tcp_req->pdu, fes, error_offset);
return -1;
}
参考文献
ソケットFSの説明:hiboma/sockfs.md at master · hiboma/hiboma · GitHub
sk_buff – Linuxカーネルメモ (bit-hive.com)
TCPlinux.pdf (sn0rt.github.io)
internal22-282-受信処理アルゴリズム – Linux Kernel Documents Wiki – Linux Kernel Documents – OSDN