Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/framework/mpas_stream_list_types.inc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
integer :: clobber_mode
integer :: gattr_update = 1
integer :: io_type
integer :: output_done_marker = 0
type (MPAS_TimeInterval_type), pointer :: recordInterval => null()
type (MPAS_stream_list_type), pointer :: alarmList_in => null()
type (MPAS_stream_list_type), pointer :: alarmList_out => null()
Expand Down
54 changes: 53 additions & 1 deletion src/framework/mpas_stream_manager.F
Original file line number Diff line number Diff line change
Expand Up @@ -1796,6 +1796,9 @@ subroutine MPAS_stream_mgr_set_property_int(manager, streamID, propertyName, pro
case (MPAS_STREAM_PROPERTY_IOTYPE)
stream_cursor % io_type = propertyValue

case (MPAS_STREAM_PROPERTY_DONE_MARKER)
stream_cursor % output_done_marker = propertyValue

case default
STREAM_ERROR_WRITE('MPAS_stream_mgr_set_property(): No such property $i' COMMA intArgs=(/propertyName/))
STREAM_ERROR_WRITE(' or specified property is not of type integer.')
Expand Down Expand Up @@ -3500,6 +3503,11 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite
if ( swapRecords ) then
stream % nRecords = tempRecord
end if

! Write done marker file if enabled for this stream
if (stream % output_done_marker == 1) then
call write_done_marker(stream % filename, manager % ioContext % dminfo, stream % blockWrite)
end if
end if

end subroutine write_stream !}}}
Expand Down Expand Up @@ -6991,7 +6999,7 @@ subroutine stream_mgr_set_property_c(manager_c, streamID_c, propertyName_c, prop
use mpas_c_interfacing, only : mpas_c_to_f_string
use iso_c_binding, only : c_char, c_int, c_ptr, c_f_pointer
use mpas_derived_types, only : MPAS_streamManager_type, MPAS_STREAM_MGR_NOERR, &
MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS
MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, MPAS_STREAM_PROPERTY_DONE_MARKER
use mpas_stream_manager, only : MPAS_stream_mgr_set_property
use mpas_kind_types, only : StrKIND

Expand All @@ -7018,6 +7026,8 @@ subroutine stream_mgr_set_property_c(manager_c, streamID_c, propertyName_c, prop
! Map property name string to constant
if (trim(propertyName) == 'output_timelevels') then
call MPAS_stream_mgr_set_property(manager, streamID, MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, propertyValue, ierr=ierr)
else if (trim(propertyName) == 'output_done_marker') then
call MPAS_stream_mgr_set_property(manager, streamID, MPAS_STREAM_PROPERTY_DONE_MARKER, 1, ierr=ierr)
end if

if (ierr == MPAS_STREAM_MGR_NOERR) then
Expand Down Expand Up @@ -7131,3 +7141,45 @@ subroutine stream_mgr_add_variable_output_alarm_c(manager_c, streamID_c, ierr_c)
end if

end subroutine stream_mgr_add_variable_output_alarm_c !}}}


!|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
!
! subroutine write_done_marker
!
!> \brief Write an empty marker file to indicate output file completion
!> \author Guoqing Ge
!> \date March 2026
!> \details
!> Creates an empty file with .done extension after output file is closed.
!> Only called if output_done_marker property is true for the stream.
!> For normal parallel I/O (blockWrite=false), only rank 0 (IO_NODE) writes
!> the marker since all ranks write to the same file.
!> For blockWrite mode, each rank writes its own file, so each rank creates
!> its own marker file.
!
!-----------------------------------------------------------------------
subroutine write_done_marker(filename, dminfo, blockWrite)!{{{

use mpas_derived_types, only : dm_info
use mpas_dmpar, only : IO_NODE

implicit none

character(len=*), intent(in) :: filename
type (dm_info), intent(in) :: dminfo
logical, intent(in) :: blockWrite

character(len=1024) :: marker_filename
integer :: unit_num

! For blockWrite mode, each rank writes its own file so each creates marker
! For normal parallel I/O, all ranks write same file so only rank 0 creates marker
if (blockWrite .or. dminfo % my_proc_id == IO_NODE) then
marker_filename = trim(filename) // '.done'
unit_num = 99
open(unit=unit_num, file=trim(marker_filename), status='replace', action='write')
close(unit_num)
end if

end subroutine write_done_marker!}}}
3 changes: 2 additions & 1 deletion src/framework/mpas_stream_manager_types.inc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
MPAS_STREAM_PROPERTY_CLOBBER = 12, &
MPAS_STREAM_PROPERTY_IOTYPE = 13, &
MPAS_STREAM_PROPERTY_GATTR_UPDATE = 14, &
MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS = 15
MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS = 15, &
MPAS_STREAM_PROPERTY_DONE_MARKER = 16

integer, public, parameter :: MPAS_STREAM_CLOBBER_NEVER = 100, &
MPAS_STREAM_CLOBBER_APPEND = 101, &
Expand Down
26 changes: 26 additions & 0 deletions src/framework/xml_stream_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status)
/* First, handle changes to immutable stream filename templates, intervals, etc. */
immutable = 1;
for (stream_xml = ezxml_child(streams, "immutable_stream"); stream_xml; stream_xml = ezxml_next(stream_xml)) {
const char *output_done_marker;
streamID = ezxml_attr(stream_xml, "name");
direction = ezxml_attr(stream_xml, "type");
filename_template = ezxml_attr(stream_xml, "filename_template");
Expand All @@ -1126,6 +1127,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status)
clobber = ezxml_attr(stream_xml, "clobber_mode");
gattr_update = ezxml_attr(stream_xml, "gattr_update");
iotype = ezxml_attr(stream_xml, "io_type");
output_done_marker = ezxml_attr(stream_xml, "output_done_marker");

/* Extract the input interval, if it refer to other streams */
if ( interval_in ) {
Expand Down Expand Up @@ -1374,6 +1376,17 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status)
return;
}

/* If output_done_marker is specified, set it as a property */
if (output_done_marker != NULL && strstr(output_done_marker, "yes") != NULL) {
stream_mgr_set_property_c(manager, streamID, "output_done_marker", "1", &err);
if (err != 0) {
*status = 1;
return;
}
snprintf(msgbuf, MSGSIZE, " %-20s%s", "output done marker:", "yes");
mpas_log_write_c(msgbuf, "MPAS_LOG_OUT");
}

/* Possibly add an input alarm for this stream */
if (itype == 3 || itype == 1) {
stream_mgr_add_alarm_c(manager, streamID, "input", "start", interval_in2, &err);
Expand Down Expand Up @@ -1442,6 +1455,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status)
immutable = 0;
for (stream_xml = ezxml_child(streams, "stream"); stream_xml; stream_xml = ezxml_next(stream_xml)) {
const char *output_timelevels;
const char *output_done_marker;
streamID = ezxml_attr(stream_xml, "name");
direction = ezxml_attr(stream_xml, "type");
filename_template = ezxml_attr(stream_xml, "filename_template");
Expand All @@ -1458,6 +1472,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status)
clobber = ezxml_attr(stream_xml, "clobber_mode");
gattr_update = ezxml_attr(stream_xml, "gattr_update");
iotype = ezxml_attr(stream_xml, "io_type");
output_done_marker = ezxml_attr(stream_xml, "output_done_marker");

/* Extract the input interval, if it refer to other streams */
if ( interval_in ) {
Expand Down Expand Up @@ -1723,6 +1738,17 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status)
mpas_log_write_c(msgbuf, "MPAS_LOG_OUT");
}

/* If output_done_marker is specified, set it as a property */
if (output_done_marker != NULL && strstr(output_done_marker, "yes") != NULL) {
stream_mgr_set_property_c(manager, streamID, "output_done_marker", "1", &err);
if (err != 0) {
*status = 1;
return;
}
snprintf(msgbuf, MSGSIZE, " %-20s%s", "output done marker:", "yes");
mpas_log_write_c(msgbuf, "MPAS_LOG_OUT");
}

/* Possibly add an input alarm for this stream */
if (itype == 3 || itype == 1) {
stream_mgr_add_alarm_c(manager, streamID, "input", "start", interval_in2, &err);
Expand Down
Loading