summaryrefslogtreecommitdiff
blob: c906c05f0e5525b8020f17fe4faa3e39b1e6e8f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
From b435f4e7d2ece7f2ea0a7b42826498e224be3f23 Mon Sep 17 00:00:00 2001
From: Rainer Gerhards <rgerhards@adiscon.com>
Date: Wed, 3 Feb 2016 16:32:07 +0100
Subject: [PATCH] bugfix: queue engine can loose one message during queue
 shutdown

... due to improper checking of return states.

closes https://github.com/rsyslog/rsyslog/issues/262
---
 runtime/ruleset.c | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index ae5a9bd..6ca97e8 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -495,6 +495,7 @@ processBatch(batch_t *pBatch, wti_t *pWti)
 	int i;
 	msg_t *pMsg;
 	ruleset_t *pRuleset;
+	rsRetVal localRet;
 	DEFiRet;
 
 	DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem);
@@ -506,15 +507,19 @@ processBatch(batch_t *pBatch, wti_t *pWti)
 		pMsg = pBatch->pElem[i].pMsg;
 		DBGPRINTF("processBATCH: next msg %d: %.128s\n", i, pMsg->pszRawMsg);
 		pRuleset = (pMsg->pRuleset == NULL) ? ourConf->rulesets.pDflt : pMsg->pRuleset;
-		scriptExec(pRuleset->root, pMsg, pWti);
-		// TODO: think if we need a return state of scriptExec - most probably
-		// the answer is "no", as we need to process the batch in any case!
-		// TODO: we must refactor this!  flag messages as committed
-		batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+		localRet = scriptExec(pRuleset->root, pMsg, pWti);
+		/* the most important case here is that processing may be aborted
+		 * due to pbShutdownImmediate, in which case we MUST NOT flag this
+		 * message as committed. If we would do so, the message would
+		 * potentially be lost.
+		 */
+		if(localRet == RS_RET_OK)
+			batchSetElemState(pBatch, i, BATCH_STATE_COMM);
 	}
 
 	/* commit phase */
-	dbgprintf("END batch execution phase, entering to commit phase\n");
+	DBGPRINTF("END batch execution phase, entering to commit phase "
+		"[processed %d of %d messages]\n", i, batchNumMsgs(pBatch));
 	actionCommitAllDirect(pWti);
 
 	DBGPRINTF("processBATCH: batch of %d elements has been processed\n", pBatch->nElem);