Código fuente para report.api.views

from datetime import datetime

from django.conf import settings
from django.db.models import Q, Value
from django.db.models.functions import Concat
from django.shortcuts import get_object_or_404
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import viewsets, mixins
from rest_framework.authentication import SessionAuthentication
from rest_framework.filters import SearchFilter, OrderingFilter
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from report.api.filterset import ObjectChangeLogFilterSet
from report.models import (
    TaskReport,
    ObjectChangeLogReportBuilder,
    ObjectChangeLogReport,
)
from report.api.serializers import (
    ReportDataTableSerializer,
    ObjectChangeDataTableSerializer,
    ValidateObjectChangeFilters,
)
from django.db import connection

from report.utils import filter_period, format_date


[documentos]class ReportDataViewSet(viewsets.ViewSet): authentication_classes = [SessionAuthentication] permission_classes = [IsAuthenticated] queryset = TaskReport.objects.all() serializer_class = ReportDataTableSerializer pagination_class = LimitOffsetPagination filter_backends = (DjangoFilterBackend, SearchFilter, OrderingFilter) ordering_fields = ["pk"] ordering = ("pk",) limit = 10 offset = 0 report = None def get_queryset(self): self.datadict = { column["name"]: "ll.row_data->>" + str(i) for i, column in enumerate(self.report.table_content["columns"]) } self.translation_columns = { self.datadict[data]: self.request.GET[data].lower() for data in self.datadict if data in self.request.GET } sql_query = ( """Select ll.row_data::json from report_taskreport as lab join (SELECT id, jsonb_array_elements(jsonb_extract_path("table_content", 'dataset' )) as row_data from report_taskreport where id=%s) as ll on ll.id=lab.id %s order by %s %s;""" # noqa: E501 % ( self.pk, self.get_where_clause(), self.get_ordering(), self.get_limit_and_offset(), ) ) with connection.cursor() as cursor: cursor.execute(sql_query) return list(map(lambda x: x[0], cursor)) def get_queryset_total(self): sql_query = ( """Select COUNT(ll.row_data->>0) from report_taskreport as lab join (SELECT id, jsonb_array_elements(jsonb_extract_path("table_content", 'dataset' )) as row_data from report_taskreport where id=%s) as ll on ll.id=lab.id""" # noqa: E501 % (self.pk,) ) with connection.cursor() as cursor: cursor.execute(sql_query) return cursor.fetchone()[0] def get_queryset_record_filtered_total(self): sql_query = ( """Select COUNT(ll.row_data->>0) from report_taskreport as lab join (SELECT id, jsonb_array_elements(jsonb_extract_path("table_content", 'dataset' )) as row_data from report_taskreport where id= %s ) as ll on ll.id=lab.id %s""" # noqa: E501 % (self.pk, self.get_where_clause()) ) with connection.cursor() as cursor: cursor.execute(sql_query) return cursor.fetchone()[0] def get_where_clause(self): where_clause = "" for i, item in enumerate(self.translation_columns): if i: where_clause += " AND " where_clause += "LOWER(%s) LIKE '%%%s%%'" % ( item, self.clean_where_item(self.translation_columns[item]), ) if where_clause: where_clause = "where " + where_clause return where_clause def clean_where_item(self, value): return value def get_ordering(self): if "ordering" in self.request.GET: self.ordering = self.request.GET.getlist("ordering") order_by_str = "" ordering_str = "" order_asc_desc = "" for i, ordering in enumerate(self.ordering): if ordering.startswith("-"): ordering_str = self.datadict[ordering[1::]] order_asc_desc = "desc" else: ordering_str = self.datadict[ordering] order_asc_desc = "asc" if i: order_by_str += " , " order_by_str += ordering_str + " " + order_asc_desc return order_by_str def get_limit_and_offset(self): limit_offset_str = "" if "limit" in self.request.GET: self.limit = self.request.GET.get("limit", self.limit) if "offset" in self.request.GET: self.offset = self.request.GET.get("offset", self.offset) limit_offset_str = "limit %s offset %s" % (self.limit, self.offset) return limit_offset_str def get_serializer(self, data): return self.serializer_class(data) def paginate_queryset(self, queryset): self.paginator = self.pagination_class() return self.paginator.paginate_queryset(queryset, self.request, view=self) def retrieve(self, request, pk, **kwargs): self.request = request self.pk = pk self.report = get_object_or_404(TaskReport, pk=pk) data = self.get_queryset() response = { "data": data, "recordsTotal": self.get_queryset_total(), "recordsFiltered": self.get_queryset_record_filtered_total(), "draw": self.request.GET.get("draw", 1), } return Response(self.get_serializer(response).data)
[documentos]class ReportDataLogViewSet(mixins.RetrieveModelMixin, viewsets.GenericViewSet): authentication_classes = [SessionAuthentication] permission_classes = [IsAuthenticated] queryset = ObjectChangeLogReportBuilder.objects.all().using( settings.READONLY_DATABASE ) serializer_class = ObjectChangeDataTableSerializer pagination_class = LimitOffsetPagination filter_backends = (DjangoFilterBackend, SearchFilter, OrderingFilter) search_fields = [ "update_time", "old_value", "new_value", "diff_value", "user__first_name", "user__last_name", "user__username", ] filterset_class = ObjectChangeLogFilterSet ordering_fields = [ "pk", "user", "update_time", "old_value", "new_value", "diff_value", ] ordering = ("pk", "user", "update_time", "old_value", "new_value", "diff_value") laboratory = None object = None unit = None report = None
[documentos] def get_queryset(self): queryset = self.queryset if self.report: return queryset.filter(report=self.report) else: return queryset.none()
def format_date_range(self, value): dev = None try: dev = datetime.strptime(value, "%m/%d/%Y") except ValueError as e: pass return dev def filter_period(self, dates, queryset): dates = dates.split("-") if len(dates) != 2: return queryset dates[0] = self.format_date_range(dates[0].strip()) dates[1] = self.format_date_range(dates[1].strip()) return queryset.filter(update_time__gte=dates[0], update_time__lte=dates[1]) def create_extra_filters(self, queryset): filters_list = ["user", "new_value", "old_value", "diff_value"] filters = {"new_value": "", "diff_value": "", "old_value": "", "user": ""} queryset = queryset.annotate( fullname=Concat("user__first_name", Value(" "), "user__last_name") ) for data in self.request.GET: if data in filters_list: filters[data] = self.request.GET.get(data, "") queryset = queryset.filter( Q(fullname__icontains=filters["user"]), Q(user__username__icontains=filters["user"]), Q(new_value__icontains=filters["new_value"]), Q(old_value__icontains=filters["old_value"]), Q(diff_value__icontains=filters["diff_value"]), ).distinct() if "update_time" in self.request.GET: queryset = self.filter_period(self.request.GET.get("update_time"), queryset) return queryset def retrieve(self, request, pk, **kwargs): task_report = get_object_or_404(TaskReport, pk=pk) validate_serializer = ValidateObjectChangeFilters(data=self.request.GET) if validate_serializer.is_valid(): self.laboratory = validate_serializer.validated_data["laboratory"] self.object = validate_serializer.validated_data["object"] self.unit = validate_serializer.validated_data["unit"] self.report = ObjectChangeLogReport.objects.filter( task_report=task_report, laboratory=self.laboratory, object=self.object, unit=self.unit, ).first() queryset = self.filter_queryset(self.get_queryset()) if "ordering" in self.request.GET: queryset = queryset.order_by(self.request.GET["ordering"]) queryset = self.create_extra_filters(queryset) data = self.paginate_queryset(queryset) total = ObjectChangeLogReportBuilder.objects.count() response = { "data": data, "recordsTotal": total, "recordsFiltered": queryset.count(), "draw": self.request.GET.get("draw", 1), } return Response(self.get_serializer(response).data)