1.初始化和启动 SocketServer
var socketServer: SocketServer = null
。。。
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)
1.1.初始化 SocketServer
/**
* An NIO socket server. The threading model is
* 1 Acceptor thread that handles new connections
* Acceptor has N Processor threads that each have their own selector and read requests from sockets
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
private val maxQueuedRequests = config.queuedMaxRequests
private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
this.logIdent = logContext.logPrefix
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics")
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics")
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
val requestChannel = new RequestChannel(maxQueuedRequests)
private val processors = new ConcurrentHashMap[Int, Processor]()
private var nextProcessorId = 0
private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _
private var stoppedProcessingRequests = false
。。。
//省略部分代码
}
1.2.启动 SocketServer
/**
* Start the socket server. Acceptors for all the listeners are started. Processors
* are started if `startupProcessors` is true. If not, processors are only started when
* [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed starting of processors
* is used to delay processing client connections until server is fully initialized, e.g.
* to ensure that all credentials have been loaded before authentications are performed.
* Acceptors are always started during `startup` so that the bound port is known when this
* method completes even when ephemeral ports are used. Incoming connections on this server
* are processed when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]].
*
* @param startupProcessors Flag indicating whether `Processor`s must be started.
*/
def startup(startupProcessors: Boolean = true) {
this.synchronized {
connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
if (startupProcessors) {
startProcessors()
}
}
newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
def value = SocketServer.this.synchronized {
val ioWaitRatioMetricNames = processors.values.asScala.map { p =>
metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
}
ioWaitRatioMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
}.sum / processors.size
}
}
)
newGauge("MemoryPoolAvailable",
new Gauge[Long] {
def value = memoryPool.availableMemory()
}
)
newGauge("MemoryPoolUsed",
new Gauge[Long] {
def value = memoryPool.size() - memoryPool.availableMemory()
}
)
info("Started " + acceptors.size + " acceptor threads")
}
版权声明:本文为yangyijun1990原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。