NewMadeleine

Documentation

nm_sendrecv_private.h
Go to the documentation of this file.
1/*
2 * NewMadeleine
3 * Copyright (C) 2006-2024 (see AUTHORS file)
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or (at
8 * your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 */
15
16#ifndef NM_SENDRECV_PRIVATE_H
17#define NM_SENDRECV_PRIVATE_H
18
25#include <nm_core_interface.h>
26#include <nm_session_private.h>
27#ifdef PIOMAN
28#include <pioman.h>
29#endif
30#ifdef MARCEL
31#include <marcel.h>
32#endif
33
34
35/* ** Events *********************************************** */
36
39{
42};
43#ifdef __cplusplus
44#define NM_SR_EVENT_MONITOR_NULL ((struct nm_sr_event_monitor_s){ (nm_sr_event_t)0, (nm_sr_event_notifier_t)NULL })
45#else /* __cplusplus */
46#define NM_SR_EVENT_MONITOR_NULL ((struct nm_sr_event_monitor_s){ .mask = 0, .notifier = NULL })
47#endif /* __cplusplus */
48
49/* ** Request ********************************************** */
50
53{
54 struct nm_req_s req;
57 void*ref;
58};
59
60#define NM_SR_REQUEST_NULL ((struct nm_sr_request_s){ .p_session = NULL, .monitor = NM_SR_EVENT_MONITOR_NULL, .ref = NULL})
61
62/* ** Requests inline ************************************** */
63
64static inline int nm_sr_request_isnull(nm_sr_request_t*p_request)
65{
66 return p_request->p_session == NULL;
67}
68
69static inline void*nm_sr_request_get_ref(nm_sr_request_t*p_request)
70{
71 return p_request->ref;
72}
73
75{
76 return p_request->p_session;
77}
78
80{
81 return nm_core_tag_get_tag(p_request->req.tag);
82}
83
85{
86 return p_request->req.p_gate;
87}
88
89static inline int nm_sr_request_set_ref(nm_sr_request_t*p_request, void*ref)
90{
91 if(!p_request->ref)
92 {
93 p_request->ref = ref;
94 return NM_ESUCCESS;
95 }
96 else
97 return -NM_EALREADY;
98}
99
101{
102 if(p_request->req.flags & NM_REQ_FLAG_PACK)
103 {
104 *size = p_request->req.pack.done;
105 return NM_ESUCCESS;
106 }
107 else if(p_request->req.flags & NM_REQ_FLAG_UNPACK)
108 {
109 *size = p_request->req.unpack.cumulated_len;
110 return NM_ESUCCESS;
111 }
112 else
113 {
115 return -NM_EINVAL;
116 }
117}
118
120{
121 if(p_request->req.flags & NM_REQ_FLAG_PACK)
122 {
123 *size = p_request->req.pack.len;
124 return NM_ESUCCESS;
125 }
126 else if(p_request->req.flags & NM_REQ_FLAG_UNPACK)
127 {
129 {
130 assert(p_request->req.unpack.expected_len != NM_LEN_UNDEFINED);
131 *size = p_request->req.unpack.expected_len;
132 return NM_ESUCCESS;
133 }
134 else
135 {
137 return -NM_EAGAIN;
138 }
139 }
140 else
141 {
143 return -NM_EINVAL;
144 }
145}
146
147static inline void nm_sr_request_wait_all(nm_sr_request_t**p_requests, int n)
148{
149 assert(n >= 1);
150 const nm_sr_request_t*p_req = NULL;
151 const uintptr_t offset = (uintptr_t)&p_req->req - (uintptr_t)p_req; /* offset of 'req' in nm_sr_request_t */
152 struct nm_core*p_core = nm_core_get_singleton();
153 nm_status_wait_all((void**)p_requests, n, offset, NM_STATUS_FINALIZED, p_core);
154}
156{
157 return nm_status_test_allbits(&p_request->req, status);
158}
159static inline int nm_sr_request_get_error(nm_sr_request_t*p_request)
160{
161 return p_request->req.err;
162}
163static inline int nm_sr_rwait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t*p_request)
164{
165 return nm_sr_request_wait(p_request);
166}
168{
169 nm_core_t p_core = nm_core_get_singleton();
170 nm_status_wait(&p_request->req, NM_STATUS_UNPACK_COMPLETED, p_core);
171 return p_request->req.err;
172}
173static inline int nm_sr_swait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t*p_request)
174{
175 return nm_sr_request_wait(p_request);
176}
177
178/* ** Send inline ****************************************** */
179
181{
182 nm_core_t p_core = nm_core_get_singleton();
183 nm_core_pack_init(p_core, &p_request->req);
185 p_request->ref = NULL;
186 p_request->p_session = p_session;
187}
188static inline void nm_sr_send_pack_data(nm_session_t p_session, nm_sr_request_t*p_request, const struct nm_data_s*p_data)
189{
190 nm_core_t p_core = nm_core_get_singleton();
191 nm_core_pack_data(p_core, &p_request->req, p_data);
192}
194 const void*ptr, nm_len_t len)
195{
196 struct nm_data_s data;
197 nm_data_contiguous_build(&data, (void*)ptr, len);
198 nm_sr_send_pack_data(p_session, p_request, &data);
199}
201 const struct iovec*v, int n)
202{
203 struct nm_data_s data;
204 nm_data_iov_build(&data, v, n);
205 nm_sr_send_pack_data(p_session, p_request, &data);
206}
207
210{
211 nm_core_t p_core = nm_core_get_singleton();
213 nm_core_pack_send(p_core, &p_request->req, core_tag, p_gate, 0);
214 return NM_ESUCCESS;
215}
216
218{
219 nm_core_t p_core = nm_core_get_singleton();
220 nm_core_pack_set_priority(p_core, &p_request->req, priority);
221}
222
224{
225 nm_core_t p_core = nm_core_get_singleton();
226 nm_core_pack_set_hlen(p_core, &p_request->req, hlen);
227 return NM_ESUCCESS;
228}
230{
231 nm_core_t p_core = nm_core_get_singleton();
232 nm_core_pack_submit(p_core, &p_request->req);
233 return NM_ESUCCESS;
234}
235
237 int n, const struct nm_chunk_s*p_chunks)
238{
239 nm_core_t p_core = nm_core_get_singleton();
240 nm_core_pack_submit_chunks(p_core, &p_request->req, n, p_chunks);
241 return NM_ESUCCESS;
242}
243
246{
247 nm_sr_send_dest(p_session, p_request, p_gate, tag);
248 nm_sr_send_submit(p_session, p_request);
249 return NM_ESUCCESS;
250}
253{
254 nm_core_t p_core = nm_core_get_singleton();
256 nm_core_pack_send(p_core, &p_request->req, core_tag, p_gate, NM_REQ_FLAG_PACK_SYNCHRONOUS);
257 nm_core_pack_submit(p_core, &p_request->req);
258 return NM_ESUCCESS;
259}
262{
263 nm_core_t p_core = nm_core_get_singleton();
265 nm_core_pack_send(p_core, &p_request->req, core_tag, p_gate, 0);
266 nm_core_pack_submit(p_core, &p_request->req);
267 return NM_ESUCCESS;
268}
269
270/* ** Recv inline ****************************************** */
271
273{
274 nm_core_t p_core = nm_core_get_singleton();
275 nm_core_unpack_init(p_core, &p_request->req);
277 p_request->ref = NULL;
278 p_request->p_session = p_session;
279}
280
281static inline void nm_sr_recv_unpack_data(nm_session_t p_session, nm_sr_request_t*p_request, const struct nm_data_s*p_data)
282{
283 nm_core_t p_core = nm_core_get_singleton();
284 nm_core_unpack_data(p_core, &p_request->req, p_data);
285}
286
288 void*ptr, nm_len_t len)
289{
290 struct nm_data_s data;
291 nm_data_contiguous_build(&data, ptr, len);
292 nm_sr_recv_unpack_data(p_session, p_request, &data);
293}
294
296 const struct iovec*v, int n)
297{
298 struct nm_data_s data;
299 nm_data_iov_build(&data, v, n);
300 nm_sr_recv_unpack_data(p_session, p_request, &data);
301}
302
305{
306 nm_sr_recv_match(p_session, p_request, p_gate, tag, mask);
307 return nm_sr_recv_post(p_session, p_request);
308}
309
312{
313 nm_core_t p_core = nm_core_get_singleton();
316 nm_core_unpack_match_recv(p_core, &p_request->req, p_gate, core_tag, core_mask);
317}
318
320 const nm_sr_event_info_t*p_event)
321{
322 nm_core_t p_core = nm_core_get_singleton();
323 nm_core_unpack_match_event(p_core, &p_request->req, p_event->recv_unexpected.p_core_event);
324}
325
327{
328 nm_core_t p_core = nm_core_get_singleton();
329 nm_core_unpack_submit(p_core, &p_request->req, NM_REQ_FLAG_NONE);
330 return NM_ESUCCESS;
331}
332
334 const struct nm_data_s*p_data)
335{
336 nm_core_t p_core = nm_core_get_singleton();
337 int err = nm_core_unpack_peek(p_core, &p_request->req, p_data, 0, nm_data_size(p_data));
338 return err;
339}
340
342 const struct nm_data_s*p_data, nm_len_t peek_offset, nm_len_t peek_len)
343{
344 nm_core_t p_core = nm_core_get_singleton();
345 int err = nm_core_unpack_peek(p_core, &p_request->req, p_data, peek_offset, peek_len);
346 return err;
347}
348
350{
351 nm_core_t p_core = nm_core_get_singleton();
352 nm_core_unpack_offset(p_core, &p_request->req, offset);
353}
354
356{
357 nm_core_t p_core = nm_core_get_singleton();
358 int err = nm_core_unpack_iprobe(p_core, &p_request->req);
359 return err;
360}
361
363{
364 nm_core_t p_core = nm_core_get_singleton();
365 assert(nm_status_test(&p_request->req, NM_STATUS_UNPACK_POSTED));
366 nm_status_wait(&p_request->req, NM_STATUS_UNPACK_DATA0, p_core);
367}
368
370{
371 nm_core_t p_core = nm_core_get_singleton();
372 assert(nm_status_test(&p_request->req, NM_STATUS_UNPACK_POSTED));
373 nm_status_wait(&p_request->req, NM_STATUS_UNPACK_DATA_SIZE, p_core);
374}
375
377{
378 assert(nm_status_test(&p_request->req, NM_STATUS_UNPACK_POSTED));
380 {
381 return NM_ESUCCESS;
382 }
383 else
384 {
386 return -NM_EAGAIN;
387 }
388}
389
390#endif /* NM_SENDRECV_PRIVATE_H*/
int nm_core_unpack_iprobe(struct nm_core *p_core, struct nm_req_s *p_unpack)
probes whether an incoming packet matched this unposted request.
void nm_core_pack_data(nm_core_t p_core, struct nm_req_s *p_pack, const struct nm_data_s *p_data)
build a pack request from data descriptor
void nm_core_unpack_init(struct nm_core *p_core, struct nm_req_s *p_unpack)
initializes an empty unpack request
#define NM_REQ_FLAG_UNPACK
request is an unpack
int nm_core_unpack_peek(struct nm_core *p_core, struct nm_req_s *p_unpack, const struct nm_data_s *p_data, nm_len_t peek_offset, nm_len_t peek_len)
peeks unexpected data without consumming it.
void nm_core_pack_send(struct nm_core *p_core, struct nm_req_s *p_pack, nm_core_tag_t tag, nm_gate_t p_gate, nm_req_flag_t flags)
set tag/gate/flags for pack request
void nm_core_unpack_offset(struct nm_core *p_core, struct nm_req_s *p_unpack, nm_len_t offset)
set an offset on data; data before offset will be discarded
#define NM_STATUS_UNPACK_DATA_SIZE
size of data is known (last chunk of data arrived), not unpacked yet- event triggered only if unpack ...
#define NM_STATUS_FINALIZED
request is finalized, may be freed
#define NM_REQ_FLAG_NONE
no flag set
void nm_core_unpack_match_recv(struct nm_core *p_core, struct nm_req_s *p_unpack, nm_gate_t p_gate, nm_core_tag_t tag, nm_core_tag_t tag_mask)
match an unpack request with given gate/tag, next sequence number assumed
void nm_core_pack_init(struct nm_core *p_core, struct nm_req_s *p_pack)
initializes an empty pack request
#define NM_CORE_TAG_HASH_FULL
mask for all sessions
#define NM_STATUS_UNPACK_DATA0
first byte of data arrived, not unpacked yet- event triggered only if unpack is posted without data s...
struct nm_core_event_s __attribute__
void nm_core_pack_submit(struct nm_core *p_core, struct nm_req_s *p_pack)
post a pack request
static void nm_status_wait_all(void **pp_reqs, int n, uintptr_t offset, nm_status_t bitmask, nm_core_t p_core)
wait for all reqs, any bit in bitmask
void nm_core_unpack_data(struct nm_core *p_core, struct nm_req_s *p_unpack, const struct nm_data_s *p_data)
build an unpack request from data descriptor
#define NM_REQ_FLAG_PACK_SYNCHRONOUS
flag pack as synchronous (i.e.
static void nm_core_pack_set_hlen(struct nm_core *p_core __attribute__((unused)), struct nm_req_s *p_pack, nm_len_t hlen)
set a header length for the given pack request
uint32_t nm_status_t
status bits of pack/unpack requests
static nm_core_tag_t nm_core_tag_build(nm_session_hash_t hashcode, nm_tag_t tag)
static int nm_status_test_allbits(struct nm_req_s *p_req, nm_status_t bitmask)
tests for all given bits in status
void nm_core_unpack_match_event(struct nm_core *p_core, struct nm_req_s *p_unpack, const struct nm_core_event_s *p_event)
match an unpack request with a packet that triggered an event
static nm_tag_t nm_core_tag_get_tag(nm_core_tag_t core_tag)
#define NM_REQ_FLAG_PACK
request is a pack
void nm_core_unpack_submit(struct nm_core *p_core, struct nm_req_s *p_unpack, nm_req_flag_t flags)
submit an unpack request
#define NM_STATUS_UNPACK_POSTED
unpack operation is in progress
void nm_core_pack_set_priority(struct nm_core *p_core, struct nm_req_s *p_pack, nm_prio_t priority)
set a priority for the given pack request
static void nm_status_wait(struct nm_req_s *p_req, nm_status_t bitmask, nm_core_t p_core)
wait for any bit matching in req status
#define NM_STATUS_UNPACK_COMPLETED
unpack operation has completed
static nm_status_t nm_status_test(const struct nm_req_s *p_req, nm_status_t bitmask)
query for given bits in req status; returns matched bits
void nm_core_pack_submit_chunks(struct nm_core *p_core, struct nm_req_s *p_pack, int n, const struct nm_chunk_s *p_chunks)
@ NM_EALREADY
already in progress or done
Definition: nm_errno.h:44
@ NM_EAGAIN
poll again
Definition: nm_errno.h:38
@ NM_ESUCCESS
successful operation
Definition: nm_errno.h:34
@ NM_EINVAL
invalid parameter
Definition: nm_errno.h:41
static nm_len_t nm_data_size(const struct nm_data_s *p_data)
returns the amount of data contained in the descriptor
Definition: nm_data.h:440
void(* nm_sr_event_notifier_t)(nm_sr_event_t event, const nm_sr_event_info_t *event_info, void *ref)
notification function for sendrecv events.
nm_sr_event_t
events for nm_sr_monitor()
int nm_sr_request_wait(nm_sr_request_t *p_request)
Wait for request completion (or cancelation)
nm_tag_t tag
the user-supplied tag
static void nm_data_contiguous_build(struct nm_data_s *p_data, void *ptr, nm_len_t len)
Definition: nm_data.h:315
static void nm_data_iov_build(struct nm_data_s *p_data, const struct iovec *v, int n)
Definition: nm_data.h:333
static nm_session_t p_session
static nm_gate_t p_gate
nm_prio_t priority
Definition: nm_headers.h:6
uint16_t hlen
length in header (header + data in header)
Definition: nm_headers.h:5
uint16_t len
chunk len
Definition: nm_headers.h:0
nm_mpi_status_t status
status of request
Definition: nm_mpi_private.h:7
nm_len_t size
size of the onsided data (not incuding target-side completion)
int nm_sr_progress(nm_session_t p_session)
Calls the scheduler.
static void nm_sr_recv_unpack_data(nm_session_t p_session, nm_sr_request_t *p_request, const struct nm_data_s *p_data)
static void nm_sr_recv_init(nm_session_t p_session, nm_sr_request_t *p_request)
static int nm_sr_send_submit_chunks(nm_session_t p_session, nm_sr_request_t *p_request, int n, const struct nm_chunk_s *p_chunks)
static int nm_sr_recv_peek(nm_session_t p_session, nm_sr_request_t *p_request, const struct nm_data_s *p_data)
static nm_gate_t nm_sr_request_get_gate(nm_sr_request_t *p_request)
static void nm_sr_send_set_priority(nm_session_t p_session, nm_sr_request_t *p_request, nm_prio_t priority)
static void nm_sr_recv_data_wait(nm_session_t p_session, nm_sr_request_t *p_request)
static int nm_sr_send_dest(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag)
static void nm_sr_send_pack_contiguous(nm_session_t p_session, nm_sr_request_t *p_request, const void *ptr, nm_len_t len)
static int nm_sr_request_get_error(nm_sr_request_t *p_request)
static nm_tag_t nm_sr_request_get_tag(nm_sr_request_t *p_request)
static void nm_sr_recv_unpack_iov(nm_session_t p_session, nm_sr_request_t *p_request, const struct iovec *v, int n)
static int nm_sr_rwait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static int nm_sr_send_rsend(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag)
static int nm_sr_send_header(nm_session_t p_session, nm_sr_request_t *p_request, nm_len_t hlen)
static int nm_sr_request_isnull(nm_sr_request_t *p_request)
static int nm_sr_recv_post(nm_session_t p_session, nm_sr_request_t *p_request)
static void nm_sr_send_init(nm_session_t p_session, nm_sr_request_t *p_request)
static int nm_sr_rwait_data(nm_session_t p_session, nm_sr_request_t *p_request)
static int nm_sr_send_isend(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag)
static int nm_sr_send_issend(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag)
static void nm_sr_recv_match_event(nm_session_t p_session, nm_sr_request_t *p_request, const nm_sr_event_info_t *p_event)
static void nm_sr_request_wait_all(nm_sr_request_t **p_requests, int n)
static int nm_sr_recv_data_test(nm_session_t p_session, nm_sr_request_t *p_request)
static void nm_sr_recv_offset(nm_session_t p_session, nm_sr_request_t *p_request, nm_len_t offset)
static int nm_sr_recv_peek_offset(nm_session_t p_session, nm_sr_request_t *p_request, const struct nm_data_s *p_data, nm_len_t peek_offset, nm_len_t peek_len)
static void nm_sr_recv_match(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag, nm_tag_t mask)
static int nm_sr_request_get_expected_size(nm_sr_request_t *p_request, nm_len_t *size)
static int nm_sr_request_get_size(nm_sr_request_t *p_request, nm_len_t *size)
static void nm_sr_recv_unpack_contiguous(nm_session_t p_session, nm_sr_request_t *p_request, void *ptr, nm_len_t len)
static int nm_sr_recv_irecv(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag, nm_tag_t mask)
#define NM_SR_EVENT_MONITOR_NULL
static nm_session_t nm_sr_request_get_session(nm_sr_request_t *p_request)
static void nm_sr_send_pack_data(nm_session_t p_session, nm_sr_request_t *p_request, const struct nm_data_s *p_data)
static int nm_sr_send_submit(nm_session_t p_session, nm_sr_request_t *p_request)
static int nm_sr_swait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static void * nm_sr_request_get_ref(nm_sr_request_t *p_request)
static int nm_sr_request_set_ref(nm_sr_request_t *p_request, void *ref)
static int nm_sr_recv_iprobe(nm_session_t p_session, nm_sr_request_t *p_request)
static int nm_sr_request_test(nm_sr_request_t *p_request, nm_status_t status)
static void nm_sr_send_pack_iov(nm_session_t p_session, nm_sr_request_t *p_request, const struct iovec *v, int n)
static void nm_sr_recv_data_size_wait(nm_session_t p_session, nm_sr_request_t *p_request)
#define NM_LEN_UNDEFINED
length is undefined
Definition: nm_types.h:73
int32_t nm_prio_t
message priority
Definition: nm_types.h:80
uint64_t nm_tag_t
user tags, 64 bits, contained in indirect hashtable
Definition: nm_types.h:58
uint64_t nm_len_t
data length used by nmad
Definition: nm_types.h:70
An internal tag.
Core NewMadeleine structure.
Definition: nm_core.h:39
a data descriptor, used to pack/unpack data from app layout to/from contiguous buffers
Definition: nm_data.h:189
Connection to another process.
Definition: nm_gate.h:100
a generic pack/unpack request
struct nm_req_s::@18::@21 unpack
nm_core_tag_t tag
tag to send to/from (works in combination with tag_mask for recv)
nm_len_t expected_len
length of posted recv (may be updated if matched packet is shorter)
nm_gate_t p_gate
dest/src gate; NULL if recv from any source
nm_len_t done
cumulated length of data sent so far
nm_req_flag_t flags
flags given by user
struct nm_req_s::@18::@20 pack
nm_len_t cumulated_len
amount of data unpacked so far
nm_len_t len
cumulated data length
int err
error status of the request
nm_session_hash_t hash_code
hash of session label, used as ID on the wire
descriptor for an event monitor
nm_sr_event_notifier_t notifier
notification function to call uppon event
nm_sr_event_t mask
event bitmask
internal defintion of the sendrecv request
void * ref
reference usable by end-user
struct nm_req_s req
inlined core pack/unpack request to avoid dynamic allocation
nm_session_t p_session
session this request belongs to
struct nm_sr_event_monitor_s monitor
events triggered on status transitions
information field for sendrecv events
struct nm_sr_event_info_t::@55 recv_unexpected
field for unexpected packets (global event NM_SR_EVENT_RECV_UNEXPECTED)
const struct nm_core_event_s * p_core_event