[sentry源码阅读] event请求与写入过程初步剖析

  • Post author:
  • Post category:其他

客户端向sentry服务发送一个错误日志在sentry内部被称为event,以js客户端为例,异常发送时的请求url为:

http://localhost:8000/api/2/store/?sentry_version=7&sentry_client=raven-js%2F3.8.1&sentry_key=36db8da42fa84f83bac146be5096815c

sentry后台基于django框架,将所有的url分为两类,api.endpoints包处理以/api/0开头的请求,web.frontend包处理其他请求。以上url请求被转给web.frontend包下的view处理,在web.urls.py文件中,查看到如下代码,发现该url被转给StoreView

# in web.urls.py

urlpatterns += patterns(
    '',
    # Store endpoints first since they are the most active
    url(r'^api/store/$', api.StoreView.as_view(),
        name='sentry-api-store'),
    url(r'^api/(?P<project_id>[\w_-]+)/store/$', api.StoreView.as_view(),
        name='sentry-api-store'),
    url(r'^api/(?P<project_id>\d+)/csp-report/$', api.CspReportView.as_view(),
        name='sentry-api-csp-report'),

StoreView:

StoreView类中,有get、post两个方法,这两个方法都会调用process()方法。process()方法是处理请求的入口方法,请求的处理大体步骤为:

1、过滤请求

if helper.should_filter(project, data, ip_address=remote_addr):

这段代码检查请求字符串data携带的字段,决定是否过滤该请求。

2、限流

if rate_limit is None or rate_limit.is_limited:

这段代码RateLimit对象判定当前是否限流,如果限流,请求被直接丢弃。

3、查看该请求是否已经被处理过,以及过滤请求中敏感字段:

if scrub_data:
    # We filter data immediately before it ever gets into the queue
    sensitive_fields_key = 'sentry:sensitive_fields'

同时在缓存中判断该请求是否已经存在:

cache_key = 'ev:%s:%s' % (project.id, event_id,)

if cache.get(cache_key) is not None:
    raise APIForbidden('An event with the same ID already exists (%s)' % (event_id,))

4、将请求内容插入数据库:

# mutates data (strips a lot of context if not queued)
helper.insert_data_to_database(data)

这个是分析的重点,随后分析。

5、在缓存中写入一个flag,以免5分钟内对event_id相同的请求进行重复处理。然后返回response给客户端

cache.set(cache_key, '', 60 * 5)

异步处理:

接下来跟踪helper.insert_data_to_database的执行过程,这个函数非常短,异步处理请求并写入数据库,将任务扔入celery后立刻返回。

    def insert_data_to_database(self, data):
        import pdb
        # pdb.set_trace()
        # we might be passed LazyData
        if isinstance(data, LazyData):
            data = dict(data.items())
        cache_key = 'e:{1}:{0}'.format(data['project'], data['event_id'])
        default_cache.set(cache_key, data, timeout=3600)
        preprocess_event.delay(cache_key=cache_key, start_time=time())

首先,如果data是延迟加载的,则通过items()方法打破延时。LazyData的实现在sentry/coreapi.py中,items()方法会触发_decode()方法的执行,将请求字符串反序列为一个dict对象。然后,将data设置一个e:{project}:{event_id]的key,丢入缓存(sentry的缓存使用redis或者memcache),并将sentry.tasks.store.preprocess_event任务丢入celery队列后返回。

sentry使用3个异步任务完成请求的处理与写入,在name=events.preprocess_event的队列中,顺序完成对data的处理和写入,这三个异步任务为:

1、sentry.tasks.store.preprocess_event任务

这个task主要完成的任务是:

1)根据cache_key,从缓存中拿到data.

2)  遍历plugin列表,挨个儿对data进行处理:

    for plugin in plugins.all(version=2):
        processors = safe_execute(plugin.get_event_preprocessors, data=data, _with_transaction=False)
        for processor in (processors or ()):
            # On the first processor found, we just defer to the process_event
            # queue to handle the actual work.
            process_event.delay(cache_key=cache_key, start_time=start_time)
            return

然后,将接下来的任务,交给sentry.tasks.store.process_event,重新扔进队列

2、sentry.tasks.store.process_event

执行内容和preprocess类似,进一步完成对data的处理,然后将写入数据库的操作封装为sentry.tasks.store.save_event任务,再次扔入队列

3、sentry.tasks.store.save_event

完成data的写入,如源码所示,调用了EventManager的save()方法处理请求并写入,稍后重点分析

    try:
        manager = EventManager(data)
        manager.save(project)
    finally:
        if cache_key:
            default_cache.delete(cache_key)
        if start_time:
            metrics.timing('events.time-to-process', time() - start_time,
                           instance=data['platform'])

4、sentry.tasks.post_process.post_process_group

在EventManager的save()方法的最后一步,开启这个异步任务,post_process_group依然遍历plugin处理请求,其具体功能还没有深入分析,

EventManager.save()分析:

这个函数完成了对data各个字段的解析,存入多个数据表。这个函数有几百行,处理逻辑比较复杂,读起来也很痛苦,目前还没能理解的很好。大体总结一下,这个函数完成了如下的内容:

1、抽取data的 event_id、level、date等等字段,封装一个Event对象。Event是一个model,对应sentry_message数据表.

        # First we pull out our top-level (non-data attr) kwargs
        event_id = data.pop('event_id')
        level = data.pop('level')

        culprit = data.pop('culprit', None)
        logger_name = data.pop('logger', None)
...
# 封装Event对象
        event = Event(
            project_id=project.id,
            event_id=event_id,
            data=data,
            time_spent=time_spent,
            datetime=date,
            **kwargs
        )

2、生成tag。 每一个event都有一个tag字段,它是一个dict,其内容在之后被写入到EventTag、TagKey、TagValue三个model中。

        tags = dict(data.get('tags') or [])
        tags['level'] = LOG_LEVELS[level]
        if logger_name:
            tags['logger'] = logger_name
        if server_name:
            tags['server_name'] = server_name
        if site:
            tags['site'] = site
        if environment:
            tags['environment'] = environment
        if transaction_name:
            tags['transaction'] = transaction_name


其中,EventTag对应sentry_eventtag表,将TagKey的id和TagValue的id对应,TagKey的id和TagValue保存tag的具体内容。

3、根据data生成fingerprint,并根据fingerprint和event_id生成hashes:

        if fingerprint:
            hashes = [
                md5_from_hash(h)
                for h in get_hashes_from_fingerprint(event, fingerprint)
                ]
        elif checksum:
            hashes = [checksum]
            data['checksum'] = checksum
        else:
            hashes = [
                md5_from_hash(h)
                for h in get_hashes_for_event(event)
                ]

hashed的作用是在_save_aggregate中将多个event聚合为一个issue,每一个event,根据其hashes,被映射到一个Group的group_id。Group是issue的model表示,在postgres中对应sentry_groupedmessage表

    def _save_aggregate(self, event, hashes, release, **kwargs):
        project = event.project

        # attempt to find a matching hash
        all_hashes = self._find_hashes(project, hashes)

        try:
            existing_group_id = six.next(h[0] for h in all_hashes if h[0])
        except StopIteration:
            existing_group_id = None

接下来该写入数据表了。

4、event聚合,并写入:

        if release:
            release = Release.get_or_create(
                project=project,
                version=release,
                date_added=date,
            )

            group_kwargs['first_release'] = release

        # print('release '+str(release))
        group, is_new, is_regression, is_sample = self._save_aggregate(
            event=event,
            hashes=hashes,
            release=release,
            **group_kwargs
        )

这一步根据release、project字段,获取或者创建sentry_release的一个记录,然后在_save_aggregate中获取或者创建一个issue记录,并返回group(即issue)

        if existing_group_id is None:
            kwargs['score'] = ScoreClause.calculate(1, kwargs['last_seen'])
            with transaction.atomic():
                short_id = project.next_short_id()
                group, group_is_new = Group.objects.create(
                    project=project,
                    short_id=short_id,
                    **kwargs
                ), True
        else:
            group = Group.objects.get(id=existing_group_id)

            group_is_new = False

5、写入sentry_eventmapping:

        try:
            with transaction.atomic(using=router.db_for_write(EventMapping)):
                EventMapping.objects.create(
                    project=project, group=group, event_id=event_id)

eventmapping保存了group到event的映射,这里是event相关信息被第一次写入

6、更新tsdb

首先根据project和group,获取environment和grouprelease,然后更新tsdb。如果group是刚刚创建的,则新建environment和grouprelease。grouprelease保存group的一些基本信息,如first_seen,last_seen等.

7、更新event_id为当前event的sentry_userreport记录

        reportToUpdate = UserReport.objects.filter(
            project=project, event_id=event_id,
        )
        # print('UserReport.filter:' + str(reportToUpdate))
        reportToUpdate.update(group=group)

8、将event写入sentry_message表,并将tag写入sentry_eventtag、sentry_filterkey、sentry_filtervalue表:

        # save the event unless its been sampled
        if not is_sample:
            try:
                with transaction.atomic(using=router.db_for_write(Event)):
                    event.save()
            except IntegrityError:
                self.logger.info('duplicate.found', exc_info=True, extra={
                    'event_id': event_id,
                    'project_id': project.id,
                    'group_id': group.id,
                    'model': Event.__name__,
                })
                return event

            index_event_tags.delay(
                project_id=project.id,
                group_id=group.id,
                event_id=event.id,
                tags=tags,
            )

sentry_filterkey、sentry_filtervalue像一个常量表,sentry_eventtag保存了每个event的<key,value>tag,key和value指向这个两个表

9、发送first_event_received 信号,并开启post_process任务

        print('................' + str(group.id))
        if not raw:
            if not project.first_event:
                project.update(first_event=date)
                first_event_received.send(project=project, group=group, sender=Project)
            print('................' + str(group.id))
            post_process_group.delay(
                group=group,
                event=event,
                is_new=is_new,
                is_sample=is_sample,
                is_regression=is_regression,
            )
        else:
            self.logger.info('post_process.skip.raw_event', extra={'event_id': event.id})

first_event_received信号在sentry/receivers/onbroading.py中被接收:

@first_event_received.connect(weak=False)
def record_first_event(project, group, **kwargs):

这个函数中,会更新护或者创建OrganizationOnboardingTask model的记录,对应sentry_organizationonboardingtask表,这个表记录了每一个organization的task数目,状态以及完成的时间。

目前看来,直接包含event信息的表有:

1、sentry_message

2、sentry_eventmapping

3、sentry_eventtag

4、sentry_eventmapping

另外的一些表,如sentry_groupedmessage和sentry_grouprelease,其中的first_seen和last_seen也间接包含了event的信息。清理数据库中的event记录时,应该考虑怎么更新这些信息。


版权声明:本文为ffantastic原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。