引言
broker的核心类
实现了MessageStore接口,broker默认消息存储实现类,完全负责broker的消息存储和消息获取。
维护CommitLog
MessageStore
public interface MessageStore {
/**
* Load previously stored messages.
*
* @return true if success; false otherwise.
*/
boolean load();
/**
* Launch this message store.
*
* @throws Exception if there is any error.
*/
void start() throws Exception;
/**
* Shutdown this message store.
*/
void shutdown();
/**
* Destroy this message store. Generally, all persistent files should be removed after invocation.
*/
void destroy();
/** Store a message into store in async manner, the processor can process the next request
* rather than wait for result
* when result is completed, notify the client in async manner
*
* @param msg MessageInstance to store
* @return a CompletableFuture for the result of store operation
*/
default CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
return CompletableFuture.completedFuture(putMessage(msg));
}
/**
* Store a batch of messages in async manner
* @param messageExtBatch the message batch
* @return a CompletableFuture for the result of store operation
*/
default CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
return CompletableFuture.completedFuture(putMessages(messageExtBatch));
}
/**
* Store a message into store.
*
* @param msg Message instance to store
* @return result of store operation.
*/
PutMessageResult putMessage(final MessageExtBrokerInner msg);
/**
* Store a batch of messages.
*
* @param messageExtBatch Message batch.
* @return result of storing batch messages.
*/
PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
/**
* Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
* from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
*
* @param group Consumer group that launches this query.
* @param topic Topic to query.
* @param queueId Queue ID to query.
* @param offset Logical offset to start from.
* @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter);
/**
* Get maximum offset of the topic queue.
*
* @param topic Topic name.
* @param queueId Queue ID.
* @return Maximum offset at present.
*/
long getMaxOffsetInQueue(final String topic, final int queueId);
/**
* Get the minimum offset of the topic queue.
*
* @param topic Topic name.
* @param queueId Queue ID.
* @return Minimum offset at present.
*/
long getMinOffsetInQueue(final String topic, final int queueId);
/**
* Get the offset of the message in the commit log, which is also known as physical offset.
*
* @param topic Topic of the message to lookup.
* @param queueId Queue ID.
* @param consumeQueueOffset offset of consume queue.
* @return physical offset.
*/
long getCommitLogOffsetInQueue(final String topic, final int queueId, final long consumeQueueOffset);
/**
* Look up the physical offset of the message whose store timestamp is as specified.
*
* @param topic Topic of the message.
* @param queueId Queue ID.
* @param timestamp Timestamp to look up.
* @return physical offset which matches.
*/
long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
/**
* Look up the message by given commit log offset.
*
* @param commitLogOffset physical offset.
* @return Message whose physical offset is as specified.
*/
MessageExt lookMessageByOffset(final long commitLogOffset);
/**
* Get one message from the specified commit log offset.
*
* @param commitLogOffset commit log offset.
* @return wrapped result of the message.
*/
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset);
/**
* Get one message from the specified commit log offset.
*
* @param commitLogOffset commit log offset.
* @param msgSize message size.
* @return wrapped result of the message.
*/
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);
/**
* Get the running information of this store.
*
* @return message store running info.
*/
String getRunningDataInfo();
/**
* Message store runtime information, which should generally contains various statistical information.
*
* @return runtime information of the message store in format of key-value pairs.
*/
HashMap<String, String> getRuntimeInfo();
/**
* Get the maximum commit log offset.
*
* @return maximum commit log offset.
*/
long getMaxPhyOffset();
/**
* Get the minimum commit log offset.
*
* @return minimum commit log offset.
*/
long getMinPhyOffset();
/**
* Get the store time of the earliest message in the given queue.
*
* @param topic Topic of the messages to query.
* @param queueId Queue ID to find.
* @return store time of the earliest message.
*/
long getEarliestMessageTime(final String topic, final int queueId);
/**
* Get the store time of the earliest message in this store.
*
* @return timestamp of the earliest message in this store.
*/
long getEarliestMessageTime();
/**
* Get the store time of the message specified.
*
* @param topic message topic.
* @param queueId queue ID.
* @param consumeQueueOffset consume queue offset.
* @return store timestamp of the message.
*/
long getMessageStoreTimeStamp(final String topic, final int queueId, final long consumeQueueOffset);
/**
* Get the total number of the messages in the specified queue.
*
* @param topic Topic
* @param queueId Queue ID.
* @return total number.
*/
long getMessageTotalInQueue(final String topic, final int queueId);
/**
* Get the raw commit log data starting from the given offset, which should used for replication purpose.
*
* @param offset starting offset.
* @return commit log data.
*/
SelectMappedBufferResult getCommitLogData(final long offset);
/**
* Append data to commit log.
*
* @param startOffset starting offset.
* @param data data to append.
* @return true if success; false otherwise.
*/
boolean appendToCommitLog(final long startOffset, final byte[] data);
/**
* Execute file deletion manually.
*/
void executeDeleteFilesManually();
/**
* Query messages by given key.
*
* @param topic topic of the message.
* @param key message key.
* @param maxNum maximum number of the messages possible.
* @param begin begin timestamp.
* @param end end timestamp.
*/
QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end);
/**
* Update HA master address.
*
* @param newAddr new address.
*/
void updateHaMasterAddress(final String newAddr);
/**
* Return how much the slave falls behind.
*
* @return number of bytes that slave falls behind.
*/
long slaveFallBehindMuch();
/**
* Return the current timestamp of the store.
*
* @return current time in milliseconds since 1970-01-01.
*/
long now();
/**
* Clean unused topics.
*
* @param topics all valid topics.
* @return number of the topics deleted.
*/
int cleanUnusedTopic(final Set<String> topics);
/**
* Clean expired consume queues.
*/
void cleanExpiredConsumerQueue();
/**
* Check if the given message has been swapped out of the memory.
*
* @param topic topic.
* @param queueId queue ID.
* @param consumeOffset consume queue offset.
* @return true if the message is no longer in memory; false otherwise.
*/
boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
/**
* Get number of the bytes that have been stored in commit log and not yet dispatched to consume queue.
*
* @return number of the bytes to dispatch.
*/
long dispatchBehindBytes();
/**
* Flush the message store to persist all data.
*
* @return maximum offset flushed to persistent storage device.
*/
long flush();
/**
* Reset written offset.
*
* @param phyOffset new offset.
* @return true if success; false otherwise.
*/
boolean resetWriteOffset(long phyOffset);
/**
* Get confirm offset.
*
* @return confirm offset.
*/
long getConfirmOffset();
/**
* Set confirm offset.
*
* @param phyOffset confirm offset to set.
*/
void setConfirmOffset(long phyOffset);
/**
* Check if the operation system page cache is busy or not.
*
* @return true if the OS page cache is busy; false otherwise.
*/
boolean isOSPageCacheBusy();
/**
* Get lock time in milliseconds of the store by far.
*
* @return lock time in milliseconds.
*/
long lockTimeMills();
/**
* Check if the transient store pool is deficient.
*
* @return true if the transient store pool is running out; false otherwise.
*/
boolean isTransientStorePoolDeficient();
/**
* Get the dispatcher list.
*
* @return list of the dispatcher.
*/
LinkedList<CommitLogDispatcher> getDispatcherList();
/**
* Get consume queue of the topic/queue.
*
* @param topic Topic.
* @param queueId Queue ID.
* @return Consume queue.
*/
ConsumeQueue getConsumeQueue(String topic, int queueId);
/**
* Get BrokerStatsManager of the messageStore.
*
* @return BrokerStatsManager.
*/
BrokerStatsManager getBrokerStatsManager();
/**
* handle
* @param brokerRole
*/
void handleScheduleMessageService(BrokerRole brokerRole);
}
主要方法
- 异步存储和同步存储
- 普通消息和批量消息
- 都是通过调用CommitLog实现逻辑
生产者普通消息的异步存储方法
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
// 校验运行状态 角色 是否可写 isOSPageCacheBusy
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
// 校验存储的property 2^16和topic 2^8的长度
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
// 存储消息耗时指标收集
putResultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
});
return putResultFuture;
}
生产者普通消息的同步步存储方法
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
版权声明:本文为wengfuying5308原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。