NewMadeleine

Documentation

« back to PM2 home.
nm_sendrecv_private.h
Go to the documentation of this file.
1/*
2 * NewMadeleine
3 * Copyright (C) 2006-2024 (see AUTHORS file)
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or (at
8 * your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 */
15
16#ifndef NM_SENDRECV_PRIVATE_H
17#define NM_SENDRECV_PRIVATE_H
18
25#include <nm_core_interface.h>
26#include <nm_session_private.h>
27#ifdef PIOMAN
28#include <pioman.h>
29#endif
30#ifdef MARCEL
31#include <marcel.h>
32#endif
33
34
35/* ** Events *********************************************** */
36
43#ifdef __cplusplus
44#define NM_SR_EVENT_MONITOR_NULL ((struct nm_sr_event_monitor_s){ (nm_sr_event_t)NM_SR_EVENT_NONE, (nm_sr_event_notifier_t)NULL })
45#else /* __cplusplus */
46#define NM_SR_EVENT_MONITOR_NULL ((struct nm_sr_event_monitor_s){ .mask = NM_SR_EVENT_NONE, .notifier = NULL })
47#endif /* __cplusplus */
48
49/* ** Request ********************************************** */
50
59
60#define NM_SR_REQUEST_NULL ((struct nm_sr_request_s){ .p_session = NULL, .monitor = NM_SR_EVENT_MONITOR_NULL, .ref = NULL})
61
62/* ** Requests inline ************************************** */
63
64static inline int nm_sr_request_isnull(nm_sr_request_t*p_request)
65{
66 return p_request->p_session == NULL;
67}
68
69static inline void*nm_sr_request_get_ref(nm_sr_request_t*p_request)
70{
71 return p_request->ref;
72}
73
75{
76 return p_request->p_session;
77}
78
80{
81 return nm_core_tag_get_tag(p_request->req.tag);
82}
83
85{
86 return p_request->req.p_gate;
87}
88
89static inline int nm_sr_request_set_ref(nm_sr_request_t*p_request, void*ref)
90{
91 if(!p_request->ref)
92 {
93 p_request->ref = ref;
94 return NM_ESUCCESS;
95 }
96 else
97 return -NM_EALREADY;
98}
99
101{
102 if(p_request->req.flags & NM_REQ_FLAG_PACK)
103 {
104 *size = p_request->req.pack.done;
105 return NM_ESUCCESS;
106 }
107 else if(p_request->req.flags & NM_REQ_FLAG_UNPACK)
108 {
109 *size = p_request->req.unpack.cumulated_len;
110 return NM_ESUCCESS;
111 }
112 else
113 {
115 return -NM_EINVAL;
116 }
117}
118
120{
121 if(p_request->req.flags & NM_REQ_FLAG_PACK)
122 {
123 *size = p_request->req.pack.len;
124 return NM_ESUCCESS;
125 }
126 else if(p_request->req.flags & NM_REQ_FLAG_UNPACK)
127 {
129 {
130 assert(p_request->req.unpack.expected_len != NM_LEN_UNDEFINED);
131 *size = p_request->req.unpack.expected_len;
132 return NM_ESUCCESS;
133 }
134 else
135 {
137 return -NM_EAGAIN;
138 }
139 }
140 else
141 {
143 return -NM_EINVAL;
144 }
145}
146
147static inline void nm_sr_request_wait_all(nm_sr_request_t**p_requests, int n)
148{
149 assert(n >= 1);
150 const nm_sr_request_t*p_req = NULL;
151 const uintptr_t offset = (uintptr_t)&p_req->req - (uintptr_t)p_req; /* offset of 'req' in nm_sr_request_t */
152 struct nm_core*p_core = nm_core_get_singleton();
153 nm_status_wait_all((void**)p_requests, n, offset, NM_STATUS_FINALIZED, p_core);
154}
156{
157 return nm_status_test_allbits(&p_request->req, status);
158}
159static inline int nm_sr_request_get_error(nm_sr_request_t*p_request)
160{
161 return p_request->req.err;
162}
163static inline int nm_sr_rwait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t*p_request)
164{
165 return nm_sr_request_wait(p_request);
166}
168{
169 nm_core_t p_core = nm_core_get_singleton();
170 nm_status_wait(&p_request->req, NM_STATUS_UNPACK_COMPLETED, p_core);
171 return p_request->req.err;
172}
173static inline int nm_sr_swait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t*p_request)
174{
175 return nm_sr_request_wait(p_request);
176}
177
178/* ** Send inline ****************************************** */
179
181{
182 nm_core_t p_core = nm_core_get_singleton();
183 nm_core_pack_init(p_core, &p_request->req);
185 p_request->ref = NULL;
186 p_request->p_session = p_session;
187}
189 nm_sr_request_t*p_request, const struct nm_data_s*p_data)
190{
191 nm_core_t p_core = nm_core_get_singleton();
192 nm_core_pack_data(p_core, &p_request->req, p_data);
193}
195 const void*ptr, nm_len_t len)
196{
197 struct nm_data_s data;
198 nm_data_contiguous_build(&data, (void*)ptr, len);
199 nm_sr_send_pack_data(p_session, p_request, &data);
200}
202 const struct iovec*v, int n)
203{
204 struct nm_data_s data;
205 nm_data_iov_build(&data, v, n);
206 nm_sr_send_pack_data(p_session, p_request, &data);
207}
208
211{
212 nm_core_t p_core = nm_core_get_singleton();
214 nm_core_pack_send(p_core, &p_request->req, core_tag, p_gate, 0);
215 return NM_ESUCCESS;
216}
217
220{
221 nm_core_t p_core = nm_core_get_singleton();
222 nm_core_pack_set_priority(p_core, &p_request->req, priority);
223}
224
226 nm_sr_request_t*p_request, nm_len_t hlen)
227{
228 nm_core_t p_core = nm_core_get_singleton();
229 nm_core_pack_set_hlen(p_core, &p_request->req, hlen);
230 return NM_ESUCCESS;
231}
233{
234 nm_core_t p_core = nm_core_get_singleton();
235 nm_core_pack_submit(p_core, &p_request->req);
236 return NM_ESUCCESS;
237}
238
240 int n, const struct nm_chunk_s*p_chunks)
241{
242 nm_core_t p_core = nm_core_get_singleton();
243 nm_core_pack_submit_chunks(p_core, &p_request->req, n, p_chunks);
244 return NM_ESUCCESS;
245}
246
249{
250 nm_sr_send_dest(p_session, p_request, p_gate, tag);
251 nm_sr_send_submit(p_session, p_request);
252 return NM_ESUCCESS;
253}
256{
257 nm_core_t p_core = nm_core_get_singleton();
259 nm_core_pack_send(p_core, &p_request->req, core_tag, p_gate, NM_REQ_FLAG_PACK_SYNCHRONOUS);
260 nm_core_pack_submit(p_core, &p_request->req);
261 return NM_ESUCCESS;
262}
265{
266 nm_core_t p_core = nm_core_get_singleton();
268 nm_core_pack_send(p_core, &p_request->req, core_tag, p_gate, 0);
269 nm_core_pack_submit(p_core, &p_request->req);
270 return NM_ESUCCESS;
271}
272
273/* ** Recv inline ****************************************** */
274
276{
277 nm_core_t p_core = nm_core_get_singleton();
278 nm_core_unpack_init(p_core, &p_request->req);
280 p_request->ref = NULL;
281 p_request->p_session = p_session;
282}
283
285 nm_sr_request_t*p_request, const struct nm_data_s*p_data)
286{
287 nm_core_t p_core = nm_core_get_singleton();
288 nm_core_unpack_data(p_core, &p_request->req, p_data);
289}
290
292 void*ptr, nm_len_t len)
293{
294 struct nm_data_s data;
295 nm_data_contiguous_build(&data, ptr, len);
296 nm_sr_recv_unpack_data(p_session, p_request, &data);
297}
298
300 const struct iovec*v, int n)
301{
302 struct nm_data_s data;
303 nm_data_iov_build(&data, v, n);
304 nm_sr_recv_unpack_data(p_session, p_request, &data);
305}
306
309{
310 nm_sr_recv_match(p_session, p_request, p_gate, tag, mask);
311 return nm_sr_recv_post(p_session, p_request);
312}
313
316{
317 nm_core_t p_core = nm_core_get_singleton();
320 nm_core_unpack_match_recv(p_core, &p_request->req, p_gate, core_tag, core_mask);
321}
322
324 const nm_sr_event_info_t*p_event)
325{
326 nm_core_t p_core = nm_core_get_singleton();
327 nm_core_unpack_match_event(p_core, &p_request->req, p_event->recv_unexpected.p_core_event);
328}
329
331{
332 nm_core_t p_core = nm_core_get_singleton();
333 nm_core_unpack_submit(p_core, &p_request->req, NM_REQ_FLAG_NONE);
334 return NM_ESUCCESS;
335}
336
338 const struct nm_data_s*p_data)
339{
340 nm_core_t p_core = nm_core_get_singleton();
341 int err = nm_core_unpack_peek(p_core, &p_request->req, p_data, 0, nm_data_size(p_data));
342 return err;
343}
344
346 const struct nm_data_s*p_data, nm_len_t peek_offset, nm_len_t peek_len)
347{
348 nm_core_t p_core = nm_core_get_singleton();
349 int err = nm_core_unpack_peek(p_core, &p_request->req, p_data, peek_offset, peek_len);
350 return err;
351}
352
354 nm_sr_request_t*p_request, nm_len_t offset)
355{
356 nm_core_t p_core = nm_core_get_singleton();
357 nm_core_unpack_offset(p_core, &p_request->req, offset);
358}
359
361{
362 nm_core_t p_core = nm_core_get_singleton();
363 int err = nm_core_unpack_iprobe(p_core, &p_request->req);
364 return err;
365}
366
368{
369 nm_core_t p_core = nm_core_get_singleton();
370 assert(nm_status_test(&p_request->req, NM_STATUS_UNPACK_POSTED));
371 nm_status_wait(&p_request->req, NM_STATUS_UNPACK_DATA0, p_core);
372}
373
375{
376 nm_core_t p_core = nm_core_get_singleton();
377 assert(nm_status_test(&p_request->req, NM_STATUS_UNPACK_POSTED));
378 nm_status_wait(&p_request->req, NM_STATUS_UNPACK_DATA_SIZE, p_core);
379}
380
382{
383 assert(nm_status_test(&p_request->req, NM_STATUS_UNPACK_POSTED));
385 {
386 return NM_ESUCCESS;
387 }
388 else
389 {
391 return -NM_EAGAIN;
392 }
393}
394
395#endif /* NM_SENDRECV_PRIVATE_H*/
struct nm_core_event_s __attribute__
int nm_core_unpack_iprobe(struct nm_core *p_core, struct nm_req_s *p_unpack)
probes whether an incoming packet matched this unposted request.
void nm_core_pack_data(nm_core_t p_core, struct nm_req_s *p_pack, const struct nm_data_s *p_data)
build a pack request from data descriptor
void nm_core_unpack_init(struct nm_core *p_core, struct nm_req_s *p_unpack)
initializes an empty unpack request
int nm_core_unpack_peek(struct nm_core *p_core, struct nm_req_s *p_unpack, const struct nm_data_s *p_data, nm_len_t peek_offset, nm_len_t peek_len)
peeks unexpected data without consumming it.
void nm_core_pack_send(struct nm_core *p_core, struct nm_req_s *p_pack, nm_core_tag_t tag, nm_gate_t p_gate, nm_req_flag_t flags)
set tag/gate/flags for pack request
void nm_core_unpack_offset(struct nm_core *p_core, struct nm_req_s *p_unpack, nm_len_t offset)
set an offset on data; data before offset will be discarded
void nm_core_unpack_match_recv(struct nm_core *p_core, struct nm_req_s *p_unpack, nm_gate_t p_gate, nm_core_tag_t tag, nm_core_tag_t tag_mask)
match an unpack request with given gate/tag, next sequence number assumed
void nm_core_pack_init(struct nm_core *p_core, struct nm_req_s *p_pack)
initializes an empty pack request
void nm_core_pack_submit(struct nm_core *p_core, struct nm_req_s *p_pack)
post a pack request
void nm_core_unpack_data(struct nm_core *p_core, struct nm_req_s *p_unpack, const struct nm_data_s *p_data)
build an unpack request from data descriptor
static void nm_core_pack_set_hlen(struct nm_core *p_core __attribute__((unused)), struct nm_req_s *p_pack, nm_len_t hlen)
set a header length for the given pack request
void nm_core_unpack_match_event(struct nm_core *p_core, struct nm_req_s *p_unpack, const struct nm_core_event_s *p_event)
match an unpack request with a packet that triggered an event
void nm_core_unpack_submit(struct nm_core *p_core, struct nm_req_s *p_unpack, nm_req_flag_t flags)
submit an unpack request
void nm_core_pack_set_priority(struct nm_core *p_core, struct nm_req_s *p_pack, nm_prio_t priority)
set a priority for the given pack request
void nm_core_pack_submit_chunks(struct nm_core *p_core, struct nm_req_s *p_pack, int n, const struct nm_chunk_s *p_chunks)
#define NM_CORE_TAG_HASH_FULL
mask for all sessions
static nm_core_tag_t nm_core_tag_build(nm_session_hash_t hashcode, nm_tag_t tag)
static nm_tag_t nm_core_tag_get_tag(nm_core_tag_t core_tag)
static nm_len_t nm_data_size(const struct nm_data_s *p_data)
returns the amount of data contained in the descriptor
Definition nm_data.h:483
@ NM_EALREADY
already in progress or done
Definition nm_errno.h:44
@ NM_EAGAIN
poll again
Definition nm_errno.h:39
@ NM_ESUCCESS
successful operation
Definition nm_errno.h:36
@ NM_EINVAL
invalid parameter
Definition nm_errno.h:42
static void nm_status_wait_all(void **pp_reqs, int n, uintptr_t offset, nm_status_t bitmask, nm_core_t p_core)
wait for all reqs, any bit in bitmask
static int nm_status_test_allbits(struct nm_req_s *p_req, nm_status_t bitmask)
tests for all given bits in status
static void nm_status_wait(struct nm_req_s *p_req, nm_status_t bitmask, nm_core_t p_core)
wait for any bit matching in req status
static nm_status_t nm_status_test(const struct nm_req_s *p_req, nm_status_t bitmask)
query for given bits in req status; returns matched bits
#define NM_REQ_FLAG_UNPACK
request is an unpack
#define NM_STATUS_UNPACK_DATA_SIZE
size of data is known (last chunk of data arrived), not unpacked yet- event triggered only if unpack ...
#define NM_STATUS_FINALIZED
request is finalized, may be freed
#define NM_REQ_FLAG_NONE
no flag set
#define NM_STATUS_UNPACK_DATA0
first byte of data arrived, not unpacked yet- event triggered only if unpack is posted without data s...
#define NM_REQ_FLAG_PACK_SYNCHRONOUS
flag pack as synchronous (i.e.
uint32_t nm_status_t
status bits of pack/unpack requests
#define NM_REQ_FLAG_PACK
request is a pack
#define NM_STATUS_UNPACK_POSTED
unpack operation is in progress
#define NM_STATUS_UNPACK_COMPLETED
unpack operation has completed
void(* nm_sr_event_notifier_t)(nm_sr_event_t event, const nm_sr_event_info_t *event_info, void *ref)
notification function for sendrecv events.
nm_sr_event_t
events for nm_sr_monitor()
int nm_sr_request_wait(nm_sr_request_t *p_request)
Wait for request completion (or cancelation)
nm_tag_t tag
the user-supplied tag
static void nm_data_contiguous_build(struct nm_data_s *p_data, void *ptr, nm_len_t len)
Definition nm_data.h:355
static void nm_data_iov_build(struct nm_data_s *p_data, const struct iovec *v, int n)
Definition nm_data.h:374
static nm_session_t p_session
static nm_gate_t p_gate
nm_prio_t priority
Definition nm_headers.h:6
uint16_t hlen
length in header (header + data in header)
Definition nm_headers.h:5
uint16_t len
chunk len
Definition nm_headers.h:0
nm_mpi_status_t status
status of request
nm_len_t size
size of the onsided data (not incuding target-side completion)
int nm_sr_progress(nm_session_t p_session)
Calls the scheduler.
static int nm_sr_send_header(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request, nm_len_t hlen)
static void nm_sr_recv_init(nm_session_t p_session, nm_sr_request_t *p_request)
static void nm_sr_recv_data_size_wait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static nm_gate_t nm_sr_request_get_gate(nm_sr_request_t *p_request)
static int nm_sr_rwait_data(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static void nm_sr_recv_data_wait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static int nm_sr_send_dest(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag)
static int nm_sr_send_submit_chunks(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request, int n, const struct nm_chunk_s *p_chunks)
static void nm_sr_send_pack_contiguous(nm_session_t p_session, nm_sr_request_t *p_request, const void *ptr, nm_len_t len)
static int nm_sr_request_get_error(nm_sr_request_t *p_request)
static nm_tag_t nm_sr_request_get_tag(nm_sr_request_t *p_request)
static void nm_sr_recv_unpack_iov(nm_session_t p_session, nm_sr_request_t *p_request, const struct iovec *v, int n)
static int nm_sr_rwait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static int nm_sr_send_rsend(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag)
static int nm_sr_request_isnull(nm_sr_request_t *p_request)
static void nm_sr_send_init(nm_session_t p_session, nm_sr_request_t *p_request)
static void nm_sr_recv_unpack_data(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request, const struct nm_data_s *p_data)
static int nm_sr_send_isend(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag)
static int nm_sr_send_issend(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag)
static void nm_sr_request_wait_all(nm_sr_request_t **p_requests, int n)
static void nm_sr_recv_match_event(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request, const nm_sr_event_info_t *p_event)
static void nm_sr_recv_offset(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request, nm_len_t offset)
static int nm_sr_send_submit(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static int nm_sr_recv_data_test(nm_session_t p_session, nm_sr_request_t *p_request)
static void nm_sr_recv_match(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag, nm_tag_t mask)
static int nm_sr_recv_peek(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request, const struct nm_data_s *p_data)
static void nm_sr_send_pack_data(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request, const struct nm_data_s *p_data)
static int nm_sr_request_get_expected_size(nm_sr_request_t *p_request, nm_len_t *size)
static int nm_sr_request_get_size(nm_sr_request_t *p_request, nm_len_t *size)
static void nm_sr_recv_unpack_contiguous(nm_session_t p_session, nm_sr_request_t *p_request, void *ptr, nm_len_t len)
static void nm_sr_send_set_priority(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request, nm_prio_t priority)
static int nm_sr_recv_irecv(nm_session_t p_session, nm_sr_request_t *p_request, nm_gate_t p_gate, nm_tag_t tag, nm_tag_t mask)
static int nm_sr_recv_peek_offset(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request, const struct nm_data_s *p_data, nm_len_t peek_offset, nm_len_t peek_len)
#define NM_SR_EVENT_MONITOR_NULL
static nm_session_t nm_sr_request_get_session(nm_sr_request_t *p_request)
static int nm_sr_recv_iprobe(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static int nm_sr_recv_post(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static int nm_sr_swait(nm_session_t p_session __attribute__((unused)), nm_sr_request_t *p_request)
static void * nm_sr_request_get_ref(nm_sr_request_t *p_request)
static int nm_sr_request_set_ref(nm_sr_request_t *p_request, void *ref)
static int nm_sr_request_test(nm_sr_request_t *p_request, nm_status_t status)
static void nm_sr_send_pack_iov(nm_session_t p_session, nm_sr_request_t *p_request, const struct iovec *v, int n)
#define NM_LEN_UNDEFINED
length is undefined
Definition nm_types.h:71
int32_t nm_prio_t
message priority
Definition nm_types.h:78
uint64_t nm_tag_t
user tags, 64 bits, contained in indirect hashtable
Definition nm_types.h:56
uint64_t nm_len_t
data length used by nmad
Definition nm_types.h:68
An internal tag.
Core NewMadeleine structure.
Definition nm_core.h:43
a data descriptor, used to pack/unpack data from app layout to/from contiguous buffers
Definition nm_data.h:196
Connection to another process.
Definition nm_gate.h:104
a generic pack/unpack request
nm_core_tag_t tag
tag to send to/from (works in combination with tag_mask for recv)
nm_len_t expected_len
length of posted recv (may be updated if matched packet is shorter)
nm_gate_t p_gate
dest/src gate; NULL if recv from any source
nm_len_t done
cumulated length of data sent so far
nm_req_flag_t flags
flags given by user
struct nm_req_s::@14::@17 unpack
struct nm_req_s::@14::@16 pack
nm_len_t cumulated_len
amount of data unpacked so far
nm_len_t len
cumulated data length
int err
error status of the request
nm_session_hash_t hash_code
hash of session label, used as ID on the wire
descriptor for an event monitor
nm_sr_event_notifier_t notifier
notification function to call uppon event
nm_sr_event_t mask
event bitmask
internal defintion of the sendrecv request
void * ref
reference usable by end-user
struct nm_req_s req
inlined core pack/unpack request to avoid dynamic allocation
nm_session_t p_session
session this request belongs to
struct nm_sr_event_monitor_s monitor
events triggered on status transitions
information field for sendrecv events
struct nm_sr_event_info_t::@75 recv_unexpected
field for unexpected packets (global event NM_SR_EVENT_RECV_UNEXPECTED)
const struct nm_core_event_s * p_core_event