From: Roger Light <roger@atchoo.org>
Date: Thu, 31 Jan 2019 21:50:42 +0000
Subject: Fix and tests for CVE-2018-12546.

diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml
index 8b212e2..d941c98 100644
--- a/man/mosquitto.conf.5.xml
+++ b/man/mosquitto.conf.5.xml
@@ -294,6 +294,24 @@
 					<para>Reloaded on reload signal.</para>
 				</listitem>
 			</varlistentry>
+			<varlistentry>
+				<term><option>check_retain_source</option> [ true | false ]</term>
+				<listitem>
+					<para>This option affects the scenario when a client
+						subscribes to a topic that has retained messages. It is
+						possible that the client that published the retained
+						message to the topic had access at the time they
+						published, but that access has been subsequently
+						removed. If <option>check_retain_source</option> is set
+						to true, the default, the source of a retained message
+						will be checked for access rights before it is
+						republished. When set to false, no check will be made
+						and the retained message will always be
+						published.</para>
+					<para>This option applies globally, regardless of the
+						<option>per_listener_settings</option> option.</para>
+				</listitem>
+			</varlistentry>
 			<varlistentry>
 				<term><option>clientid_prefixes</option> <replaceable>prefix</replaceable></term>
 				<listitem>
diff --git a/mosquitto.conf b/mosquitto.conf
index 9645005..e8c4339 100644
--- a/mosquitto.conf
+++ b/mosquitto.conf
@@ -148,6 +148,15 @@
 # setting behaviour from previous versions of mosquitto.
 #per_listener_settings false
 
+# This option affects the scenario when a client subscribes to a topic that has
+# retained messages. It is possible that the client that published the retained
+# message to the topic had access at the time they published, but that access
+# has been subsequently removed. If check_retain_source is set to true, the
+# default, the source of a retained message will be checked for access rights
+# before it is republished. When set to false, no check will be made and the
+# retained message will always be published. This affects all listeners.
+#check_retain_source true
+
 
 # =================================================================
 # Default listener
diff --git a/src/conf.c b/src/conf.c
index c995bab..f81219f 100644
--- a/src/conf.c
+++ b/src/conf.c
@@ -1093,6 +1093,9 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
 #else
 					log__printf(NULL, MOSQ_LOG_WARNING, "Warning: TLS support not available.");
 #endif
+				}else if(!strcmp(token, "check_retain_source")){
+					conf__set_cur_security_options(config, cur_listener, &cur_security_options);
+					if(conf__parse_bool(&token, "check_retain_source", &config->check_retain_source, saveptr)) return MOSQ_ERR_INVAL;
 				}else if(!strcmp(token, "ciphers")){
 #ifdef WITH_TLS
 					if(reload) continue; // Listeners not valid for reloading.
diff --git a/src/database.c b/src/database.c
index 2f76de1..8253cb4 100644
--- a/src/database.c
+++ b/src/database.c
@@ -200,6 +200,7 @@ void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *s
 	db->msg_store_bytes -= store->payloadlen;
 
 	mosquitto__free(store->source_id);
+	mosquitto__free(store->source_username);
 	if(store->dest_ids){
 		for(i=0; i<store->dest_id_count; i++){
 			mosquitto__free(store->dest_ids[i]);
@@ -587,18 +588,19 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
 	}
 	memcpy(UHPA_ACCESS(payload_uhpa, payloadlen), payload, payloadlen);
 
+	if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, 0)) return 1;
+
 	if(context && context->id){
 		source_id = context->id;
 	}else{
 		source_id = "";
 	}
-	if(db__message_store(db, source_id, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, 0)) return 1;
 
 	return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
 }
 
 /* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload. */
-int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
+int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
 {
 	struct mosquitto_msg_store *temp = NULL;
 	int rc = MOSQ_ERR_SUCCESS;
@@ -606,7 +608,7 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
 	assert(db);
 	assert(stored);
 
-	temp = mosquitto__malloc(sizeof(struct mosquitto_msg_store));
+	temp = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
 	if(!temp){
 		log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
 		rc = MOSQ_ERR_NOMEM;
@@ -617,8 +619,8 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
 	temp->payload.ptr = NULL;
 
 	temp->ref_count = 0;
-	if(source){
-		temp->source_id = mosquitto__strdup(source);
+	if(source && source->id){
+		temp->source_id = mosquitto__strdup(source->id);
 	}else{
 		temp->source_id = mosquitto__strdup("");
 	}
@@ -627,6 +629,17 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
 		rc = MOSQ_ERR_NOMEM;
 		goto error;
 	}
+
+	if(source && source->username){
+		temp->source_username = mosquitto__strdup(source->username);
+		if(!temp->source_username){
+			rc = MOSQ_ERR_NOMEM;
+			goto error;
+		}
+	}
+	if(source){
+		temp->source_listener = source->listener;
+	}
 	temp->source_mid = source_mid;
 	temp->mid = 0;
 	temp->qos = qos;
@@ -659,6 +672,7 @@ error:
 	mosquitto__free(topic);
 	if(temp){
 		mosquitto__free(temp->source_id);
+		mosquitto__free(temp->source_username);
 		mosquitto__free(temp->topic);
 		mosquitto__free(temp);
 	}
diff --git a/src/db_dump/Makefile b/src/db_dump/Makefile
index 13ae261..202af87 100644
--- a/src/db_dump/Makefile
+++ b/src/db_dump/Makefile
@@ -1,6 +1,6 @@
 include ../../config.mk
 
-CFLAGS_FINAL=${CFLAGS} -I.. -I../../lib -I../..
+CFLAGS_FINAL=${CFLAGS} -I.. -I../../lib -I../.. -I../deps
 
 .PHONY: all clean reallyclean
 
diff --git a/src/db_dump/db_dump.c b/src/db_dump/db_dump.c
index e009cce..62bf24b 100644
--- a/src/db_dump/db_dump.c
+++ b/src/db_dump/db_dump.c
@@ -83,7 +83,9 @@ struct db_msg
 	uint8_t qos, retain;
 	uint8_t *payload;
 	char *source_id;
+	char *source_username;
 	char *topic;
+	uint16_t source_port;
 };
 
 static uint32_t db_version;
@@ -177,6 +179,8 @@ print_db_msg(struct db_msg *msg, int length)
 	printf("\tLength: %d\n", length);
 	printf("\tStore ID: %" PRIu64 "\n", msg->store_id);
 	printf("\tSource ID: %s\n", msg->source_id);
+	printf("\tSource Username: %s\n", msg->source_username);
+	printf("\tSource Port: %d\n", msg->source_port);
 	printf("\tSource MID: %d\n", msg->source_mid);
 	printf("\tMID: %d\n", msg->mid);
 	printf("\tTopic: %s\n", msg->topic);
@@ -194,26 +198,49 @@ print_db_msg(struct db_msg *msg, int length)
 }
 
 
+static int persist__read_string(FILE *db_fptr, char **str)
+{
+	uint16_t i16temp;
+	uint16_t slen;
+	char *s = NULL;
+
+	if(fread(&i16temp, 1, sizeof(uint16_t), db_fptr) != sizeof(uint16_t)){
+		return MOSQ_ERR_INVAL;
+	}
+
+	slen = ntohs(i16temp);
+	if(slen){
+		s = mosquitto__malloc(slen+1);
+		if(!s){
+			fclose(db_fptr);
+			fprintf(stderr, "Error: Out of memory.\n");
+			return MOSQ_ERR_NOMEM;
+		}
+		if(fread(s, 1, slen, db_fptr) != slen){
+			mosquitto__free(s);
+			fprintf(stderr, "Error: %s.\n", strerror(errno));
+			return MOSQ_ERR_INVAL;
+		}
+		s[slen] = '\0';
+	}
+
+	*str = s;
+	return MOSQ_ERR_SUCCESS;
+}
+
+
 static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd, struct db_client *client)
 {
-	uint16_t i16temp, slen;
+	uint16_t i16temp;
 	int rc = 0;
 	struct client_chunk *cc;
 
-	read_e(db_fd, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	if(!slen){
+	rc = persist__read_string(db_fd, &client->client_id);
+	if(rc){
 		fprintf(stderr, "Error: Corrupt persistent database.");
 		fclose(db_fd);
-		return 1;
+		return rc;
 	}
-	client->client_id = calloc(slen+1, sizeof(char));
-	if(!client->client_id){
-		fclose(db_fd);
-		fprintf(stderr, "Error: Out of memory.");
-		return 1;
-	}
-	read_e(db_fd, client->client_id, slen);
 
 	read_e(db_fd, &i16temp, sizeof(uint16_t));
 	client->last_mid = ntohs(i16temp);
@@ -245,24 +272,17 @@ error:
 static int db__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_client_msg *msg)
 {
 	dbid_t i64temp;
-	uint16_t i16temp, slen;
+	uint16_t i16temp;
 	struct client_chunk *cc;
 	struct msg_store_chunk *msc;
+	int rc;
 
-	read_e(db_fd, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	if(!slen){
+	rc = persist__read_string(db_fd, &msg->client_id);
+	if(rc){
 		fprintf(stderr, "Error: Corrupt persistent database.");
 		fclose(db_fd);
-		return 1;
+		return rc;
 	}
-	msg->client_id = calloc(slen+1, sizeof(char));
-	if(!msg->client_id){
-		fclose(db_fd);
-		fprintf(stderr, "Error: Out of memory.");
-		return 1;
-	}
-	read_e(db_fd, msg->client_id, slen);
 
 	read_e(db_fd, &i64temp, sizeof(dbid_t));
 	msg->store_id = i64temp;
@@ -301,58 +321,48 @@ static int db__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uin
 {
 	dbid_t i64temp;
 	uint32_t i32temp;
-	uint16_t i16temp, slen;
+	uint16_t i16temp;
 	int rc = 0;
 	struct msg_store_chunk *mcs;
 
 	read_e(db_fd, &i64temp, sizeof(dbid_t));
 	msg->store_id = i64temp;
 
-	read_e(db_fd, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	if(slen){
-		msg->source_id = calloc(slen+1, sizeof(char));
-		if(!msg->source_id){
-			fclose(db_fd);
-			fprintf(stderr, "Error: Out of memory.");
-			return 1;
-		}
-		if(fread(msg->source_id, 1, slen, db_fd) != slen){
+	rc = persist__read_string(db_fd, &msg->source_id);
+	if(rc){
+		fprintf(stderr, "Error: Corrupt persistent database.");
+		fclose(db_fd);
+		return rc;
+	}
+	if(db_version == 4){
+		rc = persist__read_string(db_fd, &msg->source_username);
+		if(rc){
 			fprintf(stderr, "Error: %s.", strerror(errno));
 			fclose(db_fd);
 			free(msg->source_id);
+			free(msg->topic);
+			free(msg->payload);
+			free(msg->source_id);
 			return 1;
 		}
+		read_e(db_fd, &i16temp, sizeof(uint16_t));
+		msg->source_port = ntohs(i16temp);
 	}
+
+
 	read_e(db_fd, &i16temp, sizeof(uint16_t));
 	msg->source_mid = ntohs(i16temp);
 
 	read_e(db_fd, &i16temp, sizeof(uint16_t));
 	msg->mid = ntohs(i16temp);
 
-	read_e(db_fd, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	if(slen){
-		msg->topic = calloc(slen+1, sizeof(char));
-		if(!msg->topic){
-			fclose(db_fd);
-			free(msg->source_id);
-			fprintf(stderr, "Error: Out of memory.");
-			return 1;
-		}
-		if(fread(msg->topic, 1, slen, db_fd) != slen){
-			fprintf(stderr, "Error: %s.", strerror(errno));
-			fclose(db_fd);
-			free(msg->source_id);
-			free(msg->topic);
-			return 1;
-		}
-	}else{
-		fprintf(stderr, "Error: Invalid msg_store chunk when restoring persistent database.");
+	rc = persist__read_string(db_fd, &msg->topic);
+	if(rc){
+		fprintf(stderr, "Error: Corrupt persistent database.");
 		fclose(db_fd);
-		free(msg->source_id);
-		return 1;
+		return rc;
 	}
+
 	read_e(db_fd, &msg->qos, sizeof(uint8_t));
 	read_e(db_fd, &msg->retain, sizeof(uint8_t));
 	
@@ -415,29 +425,23 @@ static int db__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
 
 static int db__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_sub *sub)
 {
-	uint16_t i16temp, slen;
 	int rc = 0;
 	struct client_chunk *cc;
 
-	read_e(db_fd, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	sub->client_id = calloc(slen+1, sizeof(char));
-	if(!sub->client_id){
+	rc = persist__read_string(db_fd, &sub->client_id);
+	if(rc){
+		fprintf(stderr, "Error: Corrupt persistent database.");
 		fclose(db_fd);
-		fprintf(stderr, "Error: Out of memory.");
-		return 1;
+		return rc;
 	}
-	read_e(db_fd, sub->client_id, slen);
-	read_e(db_fd, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	sub->topic = calloc(slen+1, sizeof(char));
-	if(!sub->topic){
+
+	rc = persist__read_string(db_fd, &sub->topic);
+	if(rc){
+		fprintf(stderr, "Error: Corrupt persistent database.");
 		fclose(db_fd);
-		fprintf(stderr, "Error: Out of memory.");
-		free(sub->client_id);
-		return 1;
+		return rc;
 	}
-	read_e(db_fd, sub->topic, slen);
+
 	read_e(db_fd, &sub->qos, sizeof(uint8_t));
 
 	if(client_stats){
diff --git a/src/handle_connect.c b/src/handle_connect.c
index b9b0fef..f47edb3 100644
--- a/src/handle_connect.c
+++ b/src/handle_connect.c
@@ -123,13 +123,11 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
 	uint8_t username_flag, password_flag;
 	char *username = NULL, *password = NULL;
 	int rc;
-	struct mosquitto__acl_user *acl_tail;
 	struct mosquitto *found_context;
 	int slen;
 	uint16_t slen16;
 	struct mosquitto__subleaf *leaf;
 	int i;
-	struct mosquitto__security_options *security_opts;
 #ifdef WITH_TLS
 	X509 *client_cert = NULL;
 	X509_NAME *name;
@@ -613,36 +611,8 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
 		do_disconnect(db, found_context);
 	}
 
-	/* Associate user with its ACL, assuming we have ACLs loaded. */
-	if(db->config->per_listener_settings){
-		if(!context->listener){
-			return 1;
-		}
-		security_opts = &context->listener->security_options;
-	}else{
-		security_opts = &db->config->security_options;
-	}
-
-	if(security_opts->acl_list){
-		acl_tail = security_opts->acl_list;
-		while(acl_tail){
-			if(context->username){
-				if(acl_tail->username && !strcmp(context->username, acl_tail->username)){
-					context->acl_list = acl_tail;
-					break;
-				}
-			}else{
-				if(acl_tail->username == NULL){
-					context->acl_list = acl_tail;
-					break;
-				}
-			}
-			acl_tail = acl_tail->next;
-		}
-	}else{
-		context->acl_list = NULL;
-	}
-
+	rc = acl__find_acls(db, context);
+	if(rc) return rc;
 
 	if(will_struct){
 		context->will = will_struct;
diff --git a/src/handle_publish.c b/src/handle_publish.c
index 54976af..76b3ee8 100644
--- a/src/handle_publish.c
+++ b/src/handle_publish.c
@@ -184,7 +184,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
 	}
 	if(!stored){
 		dup = 0;
-		if(db__message_store(db, context->id, mid, topic, qos, payloadlen, &payload, retain, &stored, 0)){
+		if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, 0)){
 			return 1;
 		}
 	}else{
@@ -229,7 +229,7 @@ process_bad_message:
 		case 2:
 			db__message_store_find(context, mid, &stored);
 			if(!stored){
-				if(db__message_store(db, context->id, mid, NULL, qos, 0, NULL, false, &stored, 0)){
+				if(db__message_store(db, context, mid, NULL, qos, 0, NULL, false, &stored, 0)){
 					return 1;
 				}
 				res = db__message_insert(db, context, mid, mosq_md_in, qos, false, stored);
diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h
index 0b69a8c..c365598 100644
--- a/src/mosquitto_broker_internal.h
+++ b/src/mosquitto_broker_internal.h
@@ -248,6 +248,7 @@ struct mosquitto__config {
 	bool allow_duplicate_messages;
 	int autosave_interval;
 	bool autosave_on_changes;
+	bool check_retain_source;
 	char *clientid_prefixes;
 	bool connection_messages;
 	bool daemon;
@@ -312,6 +313,8 @@ struct mosquitto_msg_store{
 	struct mosquitto_msg_store *prev;
 	dbid_t db_id;
 	char *source_id;
+	char *source_username;
+	struct mosquitto__listener *source_listener;
 	char **dest_ids;
 	int dest_id_count;
 	int ref_count;
@@ -553,7 +556,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context);
 void db__message_dequeue_first(struct mosquitto *context);
 int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context);
 int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain);
-int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
+int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
 int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
 void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store);
 void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store);
@@ -607,6 +610,7 @@ void bridge__packet_cleanup(struct mosquitto *context);
 /* ============================================================
  * Security related functions
  * ============================================================ */
+int acl__find_acls(struct mosquitto_db *db, struct mosquitto *context);
 int mosquitto_security_module_init(struct mosquitto_db *db);
 int mosquitto_security_module_cleanup(struct mosquitto_db *db);
 
diff --git a/src/persist.c b/src/persist.c
index 3299356..2f40086 100644
--- a/src/persist.c
+++ b/src/persist.c
@@ -39,6 +39,8 @@ static uint32_t db_version;
 
 
 static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos);
+static int persist__read_string(FILE *db_fptr, char **str);
+static int persist__write_string(FILE *db_fptr, const char *str, bool nullok);
 
 static struct mosquitto *persist__find_or_add_context(struct mosquitto_db *db, const char *client_id, uint16_t last_mid)
 {
@@ -139,7 +141,7 @@ static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
 	uint32_t length;
 	dbid_t i64temp;
 	uint32_t i32temp;
-	uint16_t i16temp, slen, tlen;
+	uint16_t i16temp, tlen;
 	uint8_t i8temp;
 	struct mosquitto_msg_store *stored;
 	bool force_no_retain;
@@ -168,10 +170,19 @@ static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
 		}else{
 			tlen = 0;
 		}
-		length = htonl(sizeof(dbid_t) + 2+strlen(stored->source_id) +
+		length = sizeof(dbid_t) + 2+strlen(stored->source_id) +
 				sizeof(uint16_t) + sizeof(uint16_t) +
 				2+tlen + sizeof(uint32_t) +
-				stored->payloadlen + sizeof(uint8_t) + sizeof(uint8_t));
+				stored->payloadlen + sizeof(uint8_t) + sizeof(uint8_t)
+				+ 2*sizeof(uint16_t);
+
+		if(stored->source_id){
+			length += strlen(stored->source_id);
+		}
+		if(stored->source_username){
+			length += strlen(stored->source_username);
+		}
+		length = htonl(length);
 
 		i16temp = htons(DB_CHUNK_MSG_STORE);
 		write_e(db_fptr, &i16temp, sizeof(uint16_t));
@@ -180,12 +191,15 @@ static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
 		i64temp = stored->db_id;
 		write_e(db_fptr, &i64temp, sizeof(dbid_t));
 
-		slen = strlen(stored->source_id);
-		i16temp = htons(slen);
-		write_e(db_fptr, &i16temp, sizeof(uint16_t));
-		if(slen){
-			write_e(db_fptr, stored->source_id, slen);
+		if(persist__write_string(db_fptr, stored->source_id, false)) return 1;
+		if(persist__write_string(db_fptr, stored->source_username, true)) return 1;
+		if(stored->source_listener){
+			i16temp = htons(stored->source_listener->port);
+		}else{
+			i16temp = 0;
 		}
+		write_e(db_fptr, &i16temp, sizeof(uint16_t));
+
 
 		i16temp = htons(stored->source_mid);
 		write_e(db_fptr, &i16temp, sizeof(uint16_t));
@@ -214,6 +228,7 @@ static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
 		if(stored->payloadlen){
 			write_e(db_fptr, UHPA_ACCESS_PAYLOAD(stored), (unsigned int)stored->payloadlen);
 		}
+
 		stored = stored->next;
 	}
 
@@ -265,6 +280,60 @@ error:
 	return 1;
 }
 
+
+static int persist__read_string(FILE *db_fptr, char **str)
+{
+	uint16_t i16temp;
+	uint16_t slen;
+	char *s = NULL;
+
+	if(fread(&i16temp, 1, sizeof(uint16_t), db_fptr) != sizeof(uint16_t)){
+		return MOSQ_ERR_INVAL;
+	}
+
+	slen = ntohs(i16temp);
+	if(slen){
+		s = mosquitto__malloc(slen+1);
+		if(!s){
+			fclose(db_fptr);
+			log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
+			return MOSQ_ERR_NOMEM;
+		}
+		if(fread(s, 1, slen, db_fptr) != slen){
+			mosquitto__free(s);
+			return MOSQ_ERR_NOMEM;
+		}
+		s[slen] = '\0';
+	}
+
+	*str = s;
+	return MOSQ_ERR_SUCCESS;
+}
+
+
+static int persist__write_string(FILE *db_fptr, const char *str, bool nullok)
+{
+	uint16_t i16temp, slen;
+
+	if(str){
+		slen = strlen(str);
+		i16temp = htons(slen);
+		write_e(db_fptr, &i16temp, sizeof(uint16_t));
+		write_e(db_fptr, str, slen);
+	}else if(nullok){
+		i16temp = htons(0);
+		write_e(db_fptr, &i16temp, sizeof(uint16_t));
+	}else{
+		return 1;
+	}
+
+	return MOSQ_ERR_SUCCESS;
+error:
+	log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
+	return 1;
+}
+
+
 static int persist__subs_retain_write(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__subhier *node, const char *topic, int level)
 {
 	struct mosquitto__subhier *subhier, *subhier_tmp;
@@ -642,10 +711,10 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
 {
 	dbid_t i64temp, store_id;
 	uint32_t i32temp, payloadlen = 0;
-	uint16_t i16temp, slen, source_mid;
+	uint16_t i16temp, source_mid, source_port = 0;
 	uint8_t qos, retain;
 	mosquitto__payload_uhpa payload;
-	char *source_id = NULL;
+	struct mosquitto source;
 	char *topic = NULL;
 	int rc = 0;
 	struct mosquitto_msg_store *stored = NULL;
@@ -664,41 +733,45 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
 	read_e(db_fptr, &i64temp, sizeof(dbid_t));
 	store_id = i64temp;
 
-	read_e(db_fptr, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	if(slen){
-		source_id = mosquitto__malloc(slen+1);
-		if(!source_id){
+	memset(&source, 0, sizeof(struct mosquitto));
+
+	rc = persist__read_string(db_fptr, &source.id);
+	if(rc){
+		mosquitto__free(load);
+		return rc;
+	}
+	if(db_version == 4){
+		rc = persist__read_string(db_fptr, &source.username);
+		if(rc){
 			mosquitto__free(load);
-			fclose(db_fptr);
-			log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
-			return MOSQ_ERR_NOMEM;
+			return rc;
+		}
+		read_e(db_fptr, &i16temp, sizeof(uint16_t));
+		source_port = ntohs(i16temp);
+		if(source_port){
+			for(int i=0; i<db->config->listener_count; i++){
+				if(db->config->listeners[i].port == source_port){
+					source.listener = &db->config->listeners[i];
+					break;
+				}
+			}
 		}
-		read_e(db_fptr, source_id, slen);
-		source_id[slen] = '\0';
 	}
+
 	read_e(db_fptr, &i16temp, sizeof(uint16_t));
 	source_mid = ntohs(i16temp);
 
 	/* This is the mid - don't need it */
 	read_e(db_fptr, &i16temp, sizeof(uint16_t));
 
-	read_e(db_fptr, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	if(slen){
-		topic = mosquitto__malloc(slen+1);
-		if(!topic){
-			mosquitto__free(load);
-			fclose(db_fptr);
-			mosquitto__free(source_id);
-			log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
-			return MOSQ_ERR_NOMEM;
-		}
-		read_e(db_fptr, topic, slen);
-		topic[slen] = '\0';
-	}else{
-		topic = NULL;
+	rc = persist__read_string(db_fptr, &topic);
+	if(rc){
+		mosquitto__free(load);
+		fclose(db_fptr);
+		mosquitto__free(source.id);
+		return rc;
 	}
+
 	read_e(db_fptr, &qos, sizeof(uint8_t));
 	read_e(db_fptr, &retain, sizeof(uint8_t));
 	
@@ -709,7 +782,7 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
 		if(UHPA_ALLOC(payload, payloadlen) == 0){
 			mosquitto__free(load);
 			fclose(db_fptr);
-			mosquitto__free(source_id);
+			mosquitto__free(source.id);
 			mosquitto__free(topic);
 			log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
 			return MOSQ_ERR_NOMEM;
@@ -717,8 +790,8 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
 		read_e(db_fptr, UHPA_ACCESS(payload, payloadlen), payloadlen);
 	}
 
-	rc = db__message_store(db, source_id, source_mid, topic, qos, payloadlen, &payload, retain, &stored, store_id);
-	mosquitto__free(source_id);
+	rc = db__message_store(db, &source, source_mid, topic, qos, payloadlen, &payload, retain, &stored, store_id);
+	mosquitto__free(source.id);
 
 	if(rc == MOSQ_ERR_SUCCESS){
 		load->db_id = stored->db_id;
@@ -737,7 +810,7 @@ error:
 	err = strerror(errno);
 	log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
 	fclose(db_fptr);
-	mosquitto__free(source_id);
+	mosquitto__free(source.id);
 	mosquitto__free(topic);
 	UHPA_FREE(payload, payloadlen);
 	return 1;
@@ -768,35 +841,24 @@ static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
 
 static int persist__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
 {
-	uint16_t i16temp, slen;
 	uint8_t qos;
 	char *client_id;
 	char *topic;
 	int rc = 0;
 	char *err;
 
-	read_e(db_fptr, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	client_id = mosquitto__malloc(slen+1);
-	if(!client_id){
+	rc = persist__read_string(db_fptr, &client_id);
+	if(rc){
 		fclose(db_fptr);
-		log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
-		return MOSQ_ERR_NOMEM;
+		return rc;
 	}
-	read_e(db_fptr, client_id, slen);
-	client_id[slen] = '\0';
 
-	read_e(db_fptr, &i16temp, sizeof(uint16_t));
-	slen = ntohs(i16temp);
-	topic = mosquitto__malloc(slen+1);
-	if(!topic){
-		fclose(db_fptr);
-		log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
+	rc = persist__read_string(db_fptr, &topic);
+	if(rc){
 		mosquitto__free(client_id);
-		return MOSQ_ERR_NOMEM;
+		fclose(db_fptr);
+		return rc;
 	}
-	read_e(db_fptr, topic, slen);
-	topic[slen] = '\0';
 
 	read_e(db_fptr, &qos, sizeof(uint8_t));
 	if(persist__restore_sub(db, client_id, topic, qos)){
@@ -852,7 +914,9 @@ int persist__restore(struct mosquitto_db *db)
 		 * Is your DB change still compatible with previous versions?
 		 */
 		if(db_version > MOSQ_DB_VERSION && db_version != 0){
-			if(db_version == 2){
+			if(db_version == 3){
+				/* Addition of source_username and source_port to msg_store chunk in v4, v1.5.6 */
+			}else if(db_version == 2){
 				/* Addition of disconnect_t to client chunk in v3. */
 			}else{
 				fclose(fptr);
diff --git a/src/persist.h b/src/persist.h
index 63a1a0c..04f2634 100644
--- a/src/persist.h
+++ b/src/persist.h
@@ -17,7 +17,7 @@ Contributors:
 #ifndef PERSIST_H
 #define PERSIST_H
 
-#define MOSQ_DB_VERSION 3
+#define MOSQ_DB_VERSION 4
 
 /* DB read/write */
 const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'};
diff --git a/src/security_default.c b/src/security_default.c
index 5a886a5..44089d9 100644
--- a/src/security_default.c
+++ b/src/security_default.c
@@ -610,6 +610,49 @@ static int acl__cleanup(struct mosquitto_db *db, bool reload)
 	return MOSQ_ERR_SUCCESS;
 }
 
+
+int acl__find_acls(struct mosquitto_db *db, struct mosquitto *context)
+{
+	struct mosquitto__acl_user *acl_tail;
+	struct mosquitto__security_options *security_opts;
+
+	/* Associate user with its ACL, assuming we have ACLs loaded. */
+	if(db->config->per_listener_settings){
+		if(!context->listener){
+			return MOSQ_ERR_INVAL;
+		}
+		security_opts = &context->listener->security_options;
+	}else{
+		security_opts = &db->config->security_options;
+	}
+
+	if(security_opts->acl_list){
+		acl_tail = security_opts->acl_list;
+		while(acl_tail){
+			if(context->username){
+				if(acl_tail->username && !strcmp(context->username, acl_tail->username)){
+					context->acl_list = acl_tail;
+					break;
+				}
+			}else{
+				if(acl_tail->username == NULL){
+					context->acl_list = acl_tail;
+					break;
+				}
+			}
+			acl_tail = acl_tail->next;
+		}
+		if(context->username && context->acl_list == NULL){
+			return MOSQ_ERR_INVAL;
+		}
+	}else{
+		context->acl_list = NULL;
+	}
+
+	return MOSQ_ERR_SUCCESS;
+}
+
+
 static int pwfile__parse(const char *file, struct mosquitto__unpwd **root)
 {
 	FILE *pwfile;
diff --git a/src/subs.c b/src/subs.c
index 5953055..bae4cb4 100644
--- a/src/subs.c
+++ b/src/subs.c
@@ -659,6 +659,27 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto_msg_store *
 		return rc;
 	}
 
+	/* Check for original source access */
+	if(db->config->check_retain_source && retained->source_id){
+		struct mosquitto retain_ctxt;
+		memset(&retain_ctxt, 0, sizeof(struct mosquitto));
+
+		retain_ctxt.id = retained->source_id;
+		retain_ctxt.username = retained->source_username;
+		retain_ctxt.listener = retained->source_listener;
+
+		rc = acl__find_acls(db, &retain_ctxt);
+		if(rc) return rc;
+
+		rc = mosquitto_acl_check(db, &retain_ctxt, retained->topic, retained->payloadlen, UHPA_ACCESS(retained->payload, retained->payloadlen),
+				retained->qos, retained->retain, MOSQ_ACL_WRITE);
+		if(rc == MOSQ_ERR_ACL_DENIED){
+			return MOSQ_ERR_SUCCESS;
+		}else if(rc != MOSQ_ERR_SUCCESS){
+			return rc;
+		}
+	}
+
 	if (db->config->upgrade_outgoing_qos){
 		qos = sub_qos;
 	} else {
-- 
2.19.1

