diff --git a/src/framework/mpas_stream_list_types.inc b/src/framework/mpas_stream_list_types.inc index bd1f35615a..1b37574585 100644 --- a/src/framework/mpas_stream_list_types.inc +++ b/src/framework/mpas_stream_list_types.inc @@ -38,6 +38,7 @@ integer :: clobber_mode integer :: gattr_update = 1 integer :: io_type + integer :: output_done_marker = 0 type (MPAS_TimeInterval_type), pointer :: recordInterval => null() type (MPAS_stream_list_type), pointer :: alarmList_in => null() type (MPAS_stream_list_type), pointer :: alarmList_out => null() diff --git a/src/framework/mpas_stream_manager.F b/src/framework/mpas_stream_manager.F index 240588e427..48e07d7fe7 100644 --- a/src/framework/mpas_stream_manager.F +++ b/src/framework/mpas_stream_manager.F @@ -1796,6 +1796,9 @@ subroutine MPAS_stream_mgr_set_property_int(manager, streamID, propertyName, pro case (MPAS_STREAM_PROPERTY_IOTYPE) stream_cursor % io_type = propertyValue + case (MPAS_STREAM_PROPERTY_DONE_MARKER) + stream_cursor % output_done_marker = propertyValue + case default STREAM_ERROR_WRITE('MPAS_stream_mgr_set_property(): No such property $i' COMMA intArgs=(/propertyName/)) STREAM_ERROR_WRITE(' or specified property is not of type integer.') @@ -3500,6 +3503,11 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite if ( swapRecords ) then stream % nRecords = tempRecord end if + + ! Write done marker file if enabled for this stream + if (stream % output_done_marker == 1) then + call write_done_marker(stream % filename, manager % ioContext % dminfo, stream % blockWrite) + end if end if end subroutine write_stream !}}} @@ -6991,7 +6999,7 @@ subroutine stream_mgr_set_property_c(manager_c, streamID_c, propertyName_c, prop use mpas_c_interfacing, only : mpas_c_to_f_string use iso_c_binding, only : c_char, c_int, c_ptr, c_f_pointer use mpas_derived_types, only : MPAS_streamManager_type, MPAS_STREAM_MGR_NOERR, & - MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS + MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, MPAS_STREAM_PROPERTY_DONE_MARKER use mpas_stream_manager, only : MPAS_stream_mgr_set_property use mpas_kind_types, only : StrKIND @@ -7018,6 +7026,8 @@ subroutine stream_mgr_set_property_c(manager_c, streamID_c, propertyName_c, prop ! Map property name string to constant if (trim(propertyName) == 'output_timelevels') then call MPAS_stream_mgr_set_property(manager, streamID, MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, propertyValue, ierr=ierr) + else if (trim(propertyName) == 'output_done_marker') then + call MPAS_stream_mgr_set_property(manager, streamID, MPAS_STREAM_PROPERTY_DONE_MARKER, 1, ierr=ierr) end if if (ierr == MPAS_STREAM_MGR_NOERR) then @@ -7131,3 +7141,45 @@ subroutine stream_mgr_add_variable_output_alarm_c(manager_c, streamID_c, ierr_c) end if end subroutine stream_mgr_add_variable_output_alarm_c !}}} + + +!||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +! +! subroutine write_done_marker +! +!> \brief Write an empty marker file to indicate output file completion +!> \author Guoqing Ge +!> \date March 2026 +!> \details +!> Creates an empty file with .done extension after output file is closed. +!> Only called if output_done_marker property is true for the stream. +!> For normal parallel I/O (blockWrite=false), only rank 0 (IO_NODE) writes +!> the marker since all ranks write to the same file. +!> For blockWrite mode, each rank writes its own file, so each rank creates +!> its own marker file. +! +!----------------------------------------------------------------------- +subroutine write_done_marker(filename, dminfo, blockWrite)!{{{ + + use mpas_derived_types, only : dm_info + use mpas_dmpar, only : IO_NODE + + implicit none + + character(len=*), intent(in) :: filename + type (dm_info), intent(in) :: dminfo + logical, intent(in) :: blockWrite + + character(len=1024) :: marker_filename + integer :: unit_num + + ! For blockWrite mode, each rank writes its own file so each creates marker + ! For normal parallel I/O, all ranks write same file so only rank 0 creates marker + if (blockWrite .or. dminfo % my_proc_id == IO_NODE) then + marker_filename = trim(filename) // '.done' + unit_num = 99 + open(unit=unit_num, file=trim(marker_filename), status='replace', action='write') + close(unit_num) + end if + +end subroutine write_done_marker!}}} diff --git a/src/framework/mpas_stream_manager_types.inc b/src/framework/mpas_stream_manager_types.inc index f0c3809865..10d0a7fdbb 100644 --- a/src/framework/mpas_stream_manager_types.inc +++ b/src/framework/mpas_stream_manager_types.inc @@ -22,7 +22,8 @@ MPAS_STREAM_PROPERTY_CLOBBER = 12, & MPAS_STREAM_PROPERTY_IOTYPE = 13, & MPAS_STREAM_PROPERTY_GATTR_UPDATE = 14, & - MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS = 15 + MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS = 15, & + MPAS_STREAM_PROPERTY_DONE_MARKER = 16 integer, public, parameter :: MPAS_STREAM_CLOBBER_NEVER = 100, & MPAS_STREAM_CLOBBER_APPEND = 101, & diff --git a/src/framework/xml_stream_parser.c b/src/framework/xml_stream_parser.c index 7ed16e378b..f4e649f109 100644 --- a/src/framework/xml_stream_parser.c +++ b/src/framework/xml_stream_parser.c @@ -1111,6 +1111,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) /* First, handle changes to immutable stream filename templates, intervals, etc. */ immutable = 1; for (stream_xml = ezxml_child(streams, "immutable_stream"); stream_xml; stream_xml = ezxml_next(stream_xml)) { + const char *output_done_marker; streamID = ezxml_attr(stream_xml, "name"); direction = ezxml_attr(stream_xml, "type"); filename_template = ezxml_attr(stream_xml, "filename_template"); @@ -1126,6 +1127,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) clobber = ezxml_attr(stream_xml, "clobber_mode"); gattr_update = ezxml_attr(stream_xml, "gattr_update"); iotype = ezxml_attr(stream_xml, "io_type"); + output_done_marker = ezxml_attr(stream_xml, "output_done_marker"); /* Extract the input interval, if it refer to other streams */ if ( interval_in ) { @@ -1374,6 +1376,17 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) return; } + /* If output_done_marker is specified, set it as a property */ + if (output_done_marker != NULL && strstr(output_done_marker, "yes") != NULL) { + stream_mgr_set_property_c(manager, streamID, "output_done_marker", "1", &err); + if (err != 0) { + *status = 1; + return; + } + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output done marker:", "yes"); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } + /* Possibly add an input alarm for this stream */ if (itype == 3 || itype == 1) { stream_mgr_add_alarm_c(manager, streamID, "input", "start", interval_in2, &err); @@ -1442,6 +1455,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) immutable = 0; for (stream_xml = ezxml_child(streams, "stream"); stream_xml; stream_xml = ezxml_next(stream_xml)) { const char *output_timelevels; + const char *output_done_marker; streamID = ezxml_attr(stream_xml, "name"); direction = ezxml_attr(stream_xml, "type"); filename_template = ezxml_attr(stream_xml, "filename_template"); @@ -1458,6 +1472,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) clobber = ezxml_attr(stream_xml, "clobber_mode"); gattr_update = ezxml_attr(stream_xml, "gattr_update"); iotype = ezxml_attr(stream_xml, "io_type"); + output_done_marker = ezxml_attr(stream_xml, "output_done_marker"); /* Extract the input interval, if it refer to other streams */ if ( interval_in ) { @@ -1723,6 +1738,17 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); } + /* If output_done_marker is specified, set it as a property */ + if (output_done_marker != NULL && strstr(output_done_marker, "yes") != NULL) { + stream_mgr_set_property_c(manager, streamID, "output_done_marker", "1", &err); + if (err != 0) { + *status = 1; + return; + } + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output done marker:", "yes"); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } + /* Possibly add an input alarm for this stream */ if (itype == 3 || itype == 1) { stream_mgr_add_alarm_c(manager, streamID, "input", "start", interval_in2, &err);