继续上一篇文章:http://blog.csdn.net/hu3167343/article/details/39452983
MediaPlayerService的注册
Mediaserver的代码主要在frameworks\av\media\mediaserver\main_mediaserver.cpp中实现:
int main(int argc, char** argv)
{
……
sp<ProcessState> proc(ProcessState::self());
// 获得ServiceManager的实例
sp<IServiceManager> sm = defaultServiceManager();
// Mediaserver进程承载了好几个服务
AudioFlinger::instantiate();
MediaPlayerService::instantiate();
CameraService::instantiate();
AudioPolicyService::instantiate();
registerExtensions();
// 开始循环接收消息
ProcessState::self()->startThreadPool();
IPCThreadState::self()->joinThreadPool();
}
由main函数可知,MediaServer中运行着多个Service,下面我们主要以MediaPlayerService为例讲解其通过Binder机制与ServiceManager进程通信的过程。
MediaPlayerService::instantiate();调用的实现如下:
void MediaPlayerService::instantiate() {
defaultServiceManager()->addService(
String16("media.player"), new MediaPlayerService());
}
在上一篇文章中我们已经说到defaultServiceManager函数返回的是一个BpServiceManager对象。类BpServiceManager继承自IServiceManager,因此addService如下所示:
virtual status_t addService(const String16& name, const sp<IBinder>& service,
bool allowIsolated)
{
// Parcel可以把它当成是一个数据包类
Parcel data, reply;
data.writeInterfaceToken(IServiceManager::getInterfaceDescriptor());
data.writeString16(name);
data.writeStrongBinder(service);
data.writeInt32(allowIsolated ? 1 : 0);
// remote()返回的是mRemote,也就是BpBinder对象
status_t err = remote()->transact(ADD_SERVICE_TRANSACTION, data, &reply);
return err == NO_ERROR ? reply.readExceptionCode() : err;
}
BpBinder的transact实现如下:
status_t BpBinder::transact(
uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
{
// Once a binder has died, it will never come back to life.
if (mAlive) {
status_t status = IPCThreadState::self()->transact(
mHandle, code, data, reply, flags);
if (status == DEAD_OBJECT) mAlive = 0;
return status;
}
return DEAD_OBJECT;
}
上一篇文章中也说过BpBinder并不参与实际的底层Binder设备通信的工作,这里也看到,在transact函数中,其最后把工作传给了IPCThreadState。
接着来看IPCThreadState self函数的实现:
IPCThreadState* IPCThreadState::self()
{
// 第一次进来为false
if (gHaveTLS) {
restart:
const pthread_key_t k = gTLS;
/*
TLS是Thread Local Storage即线程本地存储空间的简称。
这种空间每个线程都有,且线程之间不共享这些空间。
通过pthread_getspecific/ pthread_setspecific函数可以获取/设置这些空间中的
内容。
显然,这里的TLS中保存了IPCThreadState的对象。
*/
IPCThreadState* st = (IPCThreadState*)pthread_getspecific(k);
if (st) return st;
return new IPCThreadState;
}
if (gShutdown) return NULL;
pthread_mutex_lock(&gTLSMutex);
if (!gHaveTLS) {
if (pthread_key_create(&gTLS, threadDestructor) != 0) {
pthread_mutex_unlock(&gTLSMutex);
return NULL;
}
gHaveTLS = true;
}
pthread_mutex_unlock(&gTLSMutex);
goto restart;
}
接着来看看IPCThreadState的构造函数。
IPCThreadState::IPCThreadState()
: mProcess(ProcessState::self()),
mMyThreadId(androidGetTid()),
mStrictModePolicy(0),
mLastTransactionBinderFlags(0)
{
// 在构造函数中,把自己设置到了TLS中。
pthread_setspecific(gTLS, this);
clearCaller();
// mIn和mOut两个都是Parcel,可以把它看成发送和接受数据的缓冲区。
mIn.setDataCapacity(256);
mOut.setDataCapacity(256);
}
主要的数据传输工作还是在IPCThreadState的transact函数中:
status_t IPCThreadState::transact(int32_t handle,
uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags)
{
status_t err = data.errorCheck();
flags |= TF_ACCEPT_FDS;
…….
if (err == NO_ERROR) {
// BC_TRANSACTION是应用程序向底层binder设备发送消息的消息码,
// 底层binder设备向应用程序回复消息则以BR_开头。
err = writeTransactionData(BC_TRANSACTION, flags, handle, code, data, NULL);
}
if ((flags & TF_ONE_WAY) == 0) {
if (reply) {
err = waitForResponse(reply);
} else {
Parcel fakeReply;
err = waitForResponse(&fakeReply);
}
} else {
err = waitForResponse(NULL, NULL);
}
return err;
}
这个函数很简单,就是调用writeTransactionData函数向底层发送数据,然后调用waitForResponse等待底层的应答。
接着来看writeTransactionData:
status_t IPCThreadState::writeTransactionData(int32_t cmd, uint32_t binderFlags,
int32_t handle, uint32_t code, const Parcel& data, status_t* statusBuffer)
{
binder_transaction_data tr;
// 此处的handle标识了要发送数据的目标端,0即代表ServiceManager
tr.target.handle = handle;
tr.code = code;
tr.flags = binderFlags;
tr.cookie = 0;
tr.sender_pid = 0;
tr.sender_euid = 0;
const status_t err = data.errorCheck();
if (err == NO_ERROR) {
tr.data_size = data.ipcDataSize();
tr.data.ptr.buffer = data.ipcData();
tr.offsets_size = data.ipcObjectsCount()*sizeof(size_t);
tr.data.ptr.offsets = data.ipcObjects();
} else if (statusBuffer) {
tr.flags |= TF_STATUS_CODE;
*statusBuffer = err;
tr.data_size = sizeof(status_t);
tr.data.ptr.buffer = statusBuffer;
tr.offsets_size = 0;
tr.data.ptr.offsets = NULL;
} else {
return (mLastError = err);
}
// 把要发送的数据写到mOut中去,而不是真正的发送
mOut.writeInt32(cmd);
mOut.write(&tr, sizeof(tr));
return NO_ERROR;
}
这个函数并不是真正的发送数据,而是将要发送的数据写到了mOut中去,等待后面的发送操作。
发送出去之后,再等待答复waitForResponse:
status_t IPCThreadState::waitForResponse(Parcel *reply, status_t *acquireResult)
{
int32_t cmd;
int32_t err;
while (1) {
/*
原来最终还是在IPCThreadState类中通过talkWithDriver与底层Binder设备建立数据通信,包括发送数据和接收数据。
*/
if ((err=talkWithDriver()) < NO_ERROR) break;
err = mIn.errorCheck();
if (err < NO_ERROR) break;
if (mIn.dataAvail() == 0) continue;
cmd = mIn.readInt32();
IF_LOG_COMMANDS() {
alog << "Processing waitForResponse Command: "
<< getReturnString(cmd) << endl;
}
switch (cmd) {
case BR_TRANSACTION_COMPLETE:
if (!reply && !acquireResult) goto finish;
break;
……
default:
err = executeCommand(cmd);
if (err != NO_ERROR) goto finish;
break;
}
}
finish:
if (err != NO_ERROR) {
if (acquireResult) *acquireResult = err;
if (reply) reply->setError(err);
mLastError = err;
}
return err;
}
加入我们在发送了数据之后,然后马上得到了回应,此时将执行executeCommand:
status_t IPCThreadState::executeCommand(int32_t cmd)
{
BBinder* obj;
RefBase::weakref_type* refs;
status_t result = NO_ERROR;
switch (cmd) {
case BR_ERROR:
result = mIn.readInt32();
break;
…...
case BR_TRANSACTION:
{
binder_transaction_data tr;
result = mIn.read(&tr, sizeof(tr));
ALOG_ASSERT(result == NO_ERROR,
"Not enough command data for brTRANSACTION");
if (result != NO_ERROR) break;
Parcel buffer;
buffer.ipcSetDataReference(
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(size_t), freeBuffer, this);
const pid_t origPid = mCallingPid;
const uid_t origUid = mCallingUid;
mCallingPid = tr.sender_pid;
mCallingUid = tr.sender_euid;
int curPrio = getpriority(PRIO_PROCESS, mMyThreadId);
if (gDisableBackgroundScheduling) {
if (curPrio > ANDROID_PRIORITY_NORMAL) {
// We have inherited a reduced priority from the caller, but do not
// want to run in that state in this process. The driver set our
// priority already (though not our scheduling class), so bounce
// it back to the default before invoking the transaction.
setpriority(PRIO_PROCESS, mMyThreadId, ANDROID_PRIORITY_NORMAL);
}
} else {
if (curPrio >= ANDROID_PRIORITY_BACKGROUND) {
// We want to use the inherited priority from the caller.
// Ensure this thread is in the background scheduling class,
// since the driver won't modify scheduling classes for us.
// The scheduling group is reset to default by the caller
// once this method returns after the transaction is complete.
set_sched_policy(mMyThreadId, SP_BACKGROUND);
}
}
//ALOGI(">>>> TRANSACT from pid %d uid %d\n", mCallingPid, mCallingUid);
Parcel reply;
if (tr.target.ptr) {
sp<BBinder> b((BBinder*)tr.cookie);
const status_t error = b->transact(tr.code, buffer, &reply, tr.flags);
if (error < NO_ERROR) reply.setError(error);
} else {
const status_t error = the_context_object->transact(tr.code, buffer, &reply, tr.flags);
if (error < NO_ERROR) reply.setError(error);
}
//ALOGI("<<<< TRANSACT from pid %d restore pid %d uid %d\n",
// mCallingPid, origPid, origUid);
if ((tr.flags & TF_ONE_WAY) == 0) {
LOG_ONEWAY("Sending reply to %d!", mCallingPid);
sendReply(reply, 0);
} else {
LOG_ONEWAY("NOT sending reply to %d!", mCallingPid);
}
mCallingPid = origPid;
mCallingUid = origUid;
IF_LOG_TRANSACTIONS() {
TextOutput::Bundle _b(alog);
alog << "BC_REPLY thr " << (void*)pthread_self() << " / obj "
<< tr.target.ptr << ": " << indent << reply << dedent << endl;
}
}
break;
case BR_DEAD_BINDER:
{
BpBinder *proxy = (BpBinder*)mIn.readInt32();
proxy->sendObituary();
mOut.writeInt32(BC_DEAD_BINDER_DONE);
mOut.writeInt32((int32_t)proxy);
} break;
case BR_CLEAR_DEATH_NOTIFICATION_DONE:
{
BpBinder *proxy = (BpBinder*)mIn.readInt32();
proxy->getWeakRefs()->decWeak(proxy);
} break;
……
case BR_SPAWN_LOOPER:
mProcess->spawnPooledThread(false);
break;
default:
printf("*** BAD COMMAND %d received from Binder driver\n", cmd);
result = UNKNOWN_ERROR;
break;
}
if (result != NO_ERROR) {
mLastError = result;
}
return result;
}
上面的writeTransactionData和waitForResponse函数都没有看到与binder通信的部分,其实秘密就在talkWithDriver函数中:
status_t IPCThreadState::talkWithDriver(bool doReceive)
{
if (mProcess->mDriverFD <= 0) {
return -EBADF;
}
binder_write_read bwr;
// Is the read buffer empty?
const bool needRead = mIn.dataPosition() >= mIn.dataSize();
// We don't want to write anything if we are still reading
// from data left in the input buffer and the caller
// has requested to read the next data.
const size_t outAvail = (!doReceive || needRead) ? mOut.dataSize() : 0;
bwr.write_size = outAvail;
bwr.write_buffer = (long unsigned int)mOut.data();
// This is what we'll read.
if (doReceive && needRead) {
bwr.read_size = mIn.dataCapacity();
bwr.read_buffer = (long unsigned int)mIn.data();
} else {
bwr.read_size = 0;
bwr.read_buffer = 0;
}
// Return immediately if there is nothing to do.
if ((bwr.write_size == 0) && (bwr.read_size == 0)) return NO_ERROR;
bwr.write_consumed = 0;
bwr.read_consumed = 0;
status_t err;
do {
#if defined(HAVE_ANDROID_OS)
if (ioctl(mProcess->mDriverFD, BINDER_WRITE_READ, &bwr) >= 0)
err = NO_ERROR;
else
err = -errno;
#else
err = INVALID_OPERATION;
#endif
if (mProcess->mDriverFD <= 0) {
err = -EBADF;
}
} while (err == -EINTR);
if (err >= NO_ERROR) {
if (bwr.write_consumed > 0) {
if (bwr.write_consumed < (ssize_t)mOut.dataSize())
mOut.remove(0, bwr.write_consumed);
else
mOut.setDataSize(0);
}
if (bwr.read_consumed > 0) {
mIn.setDataSize(bwr.read_consumed);
mIn.setDataPosition(0);
}
return NO_ERROR;
}
return err;
}
原来应用层最后是调用ioctl来和底层binder设备进行数据的互通的,包括读、写。此时,就已经往底层binder驱动中注册了MediaPlayerService服务了。
开始消息循环
再来看Mediaserver min函数的最后两句代码:
// 开始循环接收消息
ProcessState::self()->startThreadPool();
IPCThreadState::self()->joinThreadPool();
首先来看startThreadPool函数的实现:
void ProcessState::startThreadPool()
{
AutoMutex _l(mLock);
if (!mThreadPoolStarted) {
mThreadPoolStarted = true;
// 注意此处参数为true
spawnPooledThread(true);
}
}
void ProcessState::spawnPooledThread(bool isMain)
{
if (mThreadPoolStarted) {
String8 name = makeBinderThreadName();
ALOGV("Spawning new pooled thread, name=%s\n", name.string());
sp<Thread> t = new PoolThread(isMain);
t->run(name.string());
}
}
调用new PoolThread(isMain)创建了一个新线程。PoolThread是继承自Thread的线程类。
class PoolThread : public Thread
{
public:
PoolThread(bool isMain)
: mIsMain(isMain)
{
}
protected:
virtual bool threadLoop()
{
// 线程主循环函数也很简单,就是调用了IPCThreadState类的joinThreadPool
IPCThreadState::self()->joinThreadPool(mIsMain);
return false;
}
const bool mIsMain;
};
煞费苦心,新创建的线程最后调用的还是IPCThreadState的joinThreadPool函数。我们主要来看它的实现:
void IPCThreadState::joinThreadPool(bool isMain)
{
mOut.writeInt32(isMain ? BC_ENTER_LOOPER : BC_REGISTER_LOOPER);
// This thread may have been spawned by a thread that was in the background
// scheduling group, so first we will make sure it is in the foreground
// one to avoid performing an initial transaction in the background.
set_sched_policy(mMyThreadId, SP_FOREGROUND);
status_t result;
do {
processPendingDerefs();
// now get the next command to be processed, waiting if necessary
result = getAndExecuteCommand();
if (result < NO_ERROR && result != TIMED_OUT && result != -ECONNREFUSED && result != -EBADF) {
ALOGE("getAndExecuteCommand(fd=%d) returned unexpected error %d, aborting",
mProcess->mDriverFD, result);
abort();
}
// Let this thread exit the thread pool if it is no longer
// needed and it is not the main process thread.
if(result == TIMED_OUT && !isMain) {
break;
}
} while (result != -ECONNREFUSED && result != -EBADF);
LOG_THREADPOOL("**** THREAD %p (PID %d) IS LEAVING THE THREAD POOL err=%p\n",
(void*)pthread_self(), getpid(), (void*)result);
mOut.writeInt32(BC_EXIT_LOOPER);
talkWithDriver(false);
}
主要实现还是在getAndExecuteCommand函数中:
status_t IPCThreadState::getAndExecuteCommand()
{
status_t result;
int32_t cmd;
result = talkWithDriver();
if (result >= NO_ERROR) {
size_t IN = mIn.dataAvail();
if (IN < sizeof(int32_t)) return result;
cmd = mIn.readInt32();
IF_LOG_COMMANDS() {
alog << "Processing top-level Command: "
<< getReturnString(cmd) << endl;
}
result = executeCommand(cmd);
// After executing the command, ensure that the thread is returned to the
// foreground cgroup before rejoining the pool. The driver takes care of
// restoring the priority, but doesn't do anything with cgroups so we
// need to take care of that here in userspace. Note that we do make
// sure to go in the foreground after executing a transaction, but
// there are other callbacks into user code that could have changed
// our group so we want to make absolutely sure it is put back.
set_sched_policy(mMyThreadId, SP_FOREGROUND);
}
return result;
}
getAndExecuteCommand是通过talkWithDriver函数与底层binder设备进行通信的,得到返回数据后调用executeCommand处理相应的消息。