@@ -837,8 +837,41 @@ static int input_chunk_write_header(struct cio_chunk *chunk, int event_type,
837
837
return 0 ;
838
838
}
839
839
840
- struct flb_input_chunk * flb_input_chunk_create (struct flb_input_instance * in , int event_type ,
841
- const char * tag , int tag_len )
840
+ static inline void errnum_set (int * errnum , int error )
841
+ {
842
+ if (errnum ) {
843
+ * errnum = error ;
844
+ }
845
+ }
846
+
847
+ static inline void errnum_set_from_errno (int * errnum )
848
+ {
849
+ if (errno ) {
850
+ errnum_set (errnum , errno );
851
+ }
852
+ }
853
+
854
+ static inline void errnum_set_cio (int * errnum , int cio_err )
855
+ {
856
+ switch (cio_err ) {
857
+ case CIO_OK :
858
+ errnum_set (errnum , 0 );
859
+ break ;
860
+ case CIO_CORRUPTED :
861
+ errnum_set (errnum , EIO );
862
+ break ;
863
+ case CIO_RETRY :
864
+ errnum_set (errnum , EAGAIN );
865
+ break ;
866
+ default :
867
+ case CIO_ERROR :
868
+ errnum_set (errnum , EINVAL );
869
+ break ;
870
+ }
871
+ }
872
+
873
+ static struct flb_input_chunk * input_chunk_create (struct flb_input_instance * in , int event_type ,
874
+ const char * tag , int tag_len , int * errnum )
842
875
{
843
876
int ret ;
844
877
int err ;
@@ -860,6 +893,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
860
893
if (!chunk ) {
861
894
flb_error ("[input chunk] could not create chunk file: %s:%s" ,
862
895
storage -> stream -> name , name );
896
+ errnum_set_cio (errnum , err );
863
897
return NULL ;
864
898
}
865
899
/*
@@ -870,6 +904,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
870
904
if (ret == CIO_FALSE ) {
871
905
ret = cio_chunk_up_force (chunk );
872
906
if (ret == -1 ) {
907
+ errnum_set (errnum , EIO );
873
908
cio_chunk_close (chunk , CIO_TRUE );
874
909
return NULL ;
875
910
}
@@ -879,6 +914,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
879
914
/* Write chunk header */
880
915
ret = input_chunk_write_header (chunk , event_type , (char * ) tag , tag_len );
881
916
if (ret == -1 ) {
917
+ errnum_set (errnum , EIO );
882
918
cio_chunk_close (chunk , CIO_TRUE );
883
919
return NULL ;
884
920
}
@@ -887,6 +923,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
887
923
ic = flb_calloc (1 , sizeof (struct flb_input_chunk ));
888
924
if (!ic ) {
889
925
flb_errno ();
926
+ errnum_set_from_errno (errnum );
890
927
cio_chunk_close (chunk , CIO_TRUE );
891
928
return NULL ;
892
929
}
@@ -938,6 +975,12 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
938
975
return ic ;
939
976
}
940
977
978
+ struct flb_input_chunk * flb_input_chunk_create (struct flb_input_instance * in , int event_type ,
979
+ const char * tag , int tag_len )
980
+ {
981
+ return input_chunk_create (in , event_type , tag , tag_len , NULL );
982
+ }
983
+
941
984
int flb_input_chunk_destroy_corrupted (struct flb_input_chunk * ic ,
942
985
const char * tag_buf , int tag_len ,
943
986
int del )
@@ -1109,7 +1152,8 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)
1109
1152
static struct flb_input_chunk * input_chunk_get (struct flb_input_instance * in ,
1110
1153
int event_type ,
1111
1154
const char * tag , int tag_len ,
1112
- size_t chunk_size , int * set_down )
1155
+ size_t chunk_size , int * set_down ,
1156
+ int * errnum )
1113
1157
{
1114
1158
int id = -1 ;
1115
1159
int ret ;
@@ -1174,7 +1218,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
1174
1218
1175
1219
/* No chunk was found, we need to create a new one */
1176
1220
if (!ic ) {
1177
- ic = flb_input_chunk_create (in , event_type , (char * ) tag , tag_len );
1221
+ ic = input_chunk_create (in , event_type , (char * ) tag , tag_len , errnum );
1178
1222
new_chunk = FLB_TRUE ;
1179
1223
if (!ic ) {
1180
1224
return NULL ;
@@ -1198,6 +1242,8 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
1198
1242
if (new_chunk || flb_routes_mask_is_empty (ic -> routes_mask ) == FLB_TRUE ) {
1199
1243
flb_input_chunk_destroy (ic , FLB_TRUE );
1200
1244
}
1245
+ /* Set the error no ENOSPC so the caller knows we have hit a storage limit. */
1246
+ errnum_set (errnum , ENOSPC );
1201
1247
return NULL ;
1202
1248
}
1203
1249
@@ -1466,6 +1512,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
1466
1512
const void * buf , size_t buf_size )
1467
1513
{
1468
1514
int ret , total_records_start ;
1515
+ int err = 0 ;
1469
1516
int set_down = FLB_FALSE ;
1470
1517
int min ;
1471
1518
int new_chunk = FLB_FALSE ;
@@ -1513,7 +1560,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
1513
1560
1514
1561
if (ret != 0 ) {
1515
1562
/* we could not allocate the required space, just return */
1516
- return -1 ;
1563
+ return - ENOMEM ;
1517
1564
}
1518
1565
}
1519
1566
}
@@ -1522,7 +1569,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
1522
1569
if (flb_input_buf_paused (in ) == FLB_TRUE ) {
1523
1570
flb_debug ("[input chunk] %s is paused, cannot append records" ,
1524
1571
flb_input_name (in ));
1525
- return -1 ;
1572
+ return - EAGAIN ;
1526
1573
}
1527
1574
1528
1575
if (buf_size == 0 ) {
@@ -1549,10 +1596,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
1549
1596
* Get a target input chunk, can be one with remaining space available
1550
1597
* or a new one.
1551
1598
*/
1552
- ic = input_chunk_get (in , event_type , tag , tag_len , buf_size , & set_down );
1599
+ ic = input_chunk_get (in , event_type , tag , tag_len , buf_size , & set_down , & err );
1553
1600
if (!ic ) {
1554
1601
flb_error ("[input chunk] no available chunk" );
1555
- return -1 ;
1602
+ if (err != 0 ) {
1603
+ return - err ;
1604
+ }
1605
+ /* fallback on returning errno if it is set. */
1606
+ else if (errno != 0 ) {
1607
+ return - errno ;
1608
+ }
1609
+ return - EIO ;
1556
1610
}
1557
1611
1558
1612
/* newly created chunk */
@@ -1564,9 +1618,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
1564
1618
ret = flb_input_chunk_is_up (ic );
1565
1619
if (ret == FLB_FALSE ) {
1566
1620
ret = cio_chunk_up_force (ic -> chunk );
1567
- if (ret == -1 ) {
1621
+ if (ret <= CIO_ERROR ) {
1568
1622
flb_error ("[input chunk] cannot retrieve temporary chunk" );
1569
- return -1 ;
1623
+ switch (ret ) {
1624
+ case CIO_CORRUPTED :
1625
+ return - EIO ;
1626
+ case CIO_RETRY :
1627
+ return - EAGAIN ;
1628
+ case CIO_ERROR :
1629
+ return - ENOMEM ;
1630
+ }
1631
+ return - EINVAL ;
1570
1632
}
1571
1633
set_down = FLB_TRUE ;
1572
1634
}
@@ -1638,6 +1700,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
1638
1700
ret = flb_input_chunk_write (ic ,
1639
1701
final_data_buffer ,
1640
1702
final_data_size );
1703
+ err = errno ;
1641
1704
}
1642
1705
else {
1643
1706
ret = 0 ;
@@ -1661,8 +1724,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
1661
1724
flb_error ("[input chunk] error writing data from %s instance" ,
1662
1725
flb_input_name (in ));
1663
1726
cio_chunk_tx_rollback (ic -> chunk );
1664
-
1665
- return -1 ;
1727
+ if (err ) {
1728
+ return - err ;
1729
+ }
1730
+ return - EIO ;
1666
1731
}
1667
1732
1668
1733
/* get the chunks content size */
0 commit comments