Package schemadiff
Expand source code
from typing import Union, List
from graphql import GraphQLSchema as GQLSchema, is_schema
from schemadiff.changes import Change
from schemadiff.diff.schema import Schema
from schemadiff.schema_loader import SchemaLoader
from schemadiff.formatting import print_diff, format_diff
from schemadiff.validation import validate_changes
SDL = str # Alias for string describing schema through schema definition language
def diff(old_schema: Union[SDL, GQLSchema], new_schema: Union[SDL, GQLSchema]) -> List[Change]:
"""Compare two graphql schemas highlighting dangerous and breaking changes.
Returns:
changes (List[Change]): List of differences between both schemas with details about each change
"""
first = SchemaLoader.from_sdl(old_schema) if not is_schema(old_schema) else old_schema
second = SchemaLoader.from_sdl(new_schema) if not is_schema(new_schema) else new_schema
return Schema(first, second).diff()
def diff_from_file(schema_file: str, other_schema_file: str):
"""Compare two graphql schema files highlighting dangerous and breaking changes.
Returns:
changes (List[Change]): List of differences between both schemas with details about each change
"""
first = SchemaLoader.from_file(schema_file)
second = SchemaLoader.from_file(other_schema_file)
return Schema(first, second).diff()
__all__ = [
'diff',
'diff_from_file',
'format_diff',
'print_diff',
'validate_changes',
'Change',
]
Sub-modules
schemadiff.allow_list
schemadiff.changes
schemadiff.formatting
schemadiff.schema_loader
schemadiff.validation
schemadiff.validation_rules
Functions
def diff(old_schema: Union[str, graphql.type.schema.GraphQLSchema], new_schema: Union[str, graphql.type.schema.GraphQLSchema]) ‑> List[Change]
-
Compare two graphql schemas highlighting dangerous and breaking changes.
Returns
changes (List[Change]): List of differences between both schemas with details about each change
Expand source code
def diff(old_schema: Union[SDL, GQLSchema], new_schema: Union[SDL, GQLSchema]) -> List[Change]: """Compare two graphql schemas highlighting dangerous and breaking changes. Returns: changes (List[Change]): List of differences between both schemas with details about each change """ first = SchemaLoader.from_sdl(old_schema) if not is_schema(old_schema) else old_schema second = SchemaLoader.from_sdl(new_schema) if not is_schema(new_schema) else new_schema return Schema(first, second).diff()
def diff_from_file(schema_file: str, other_schema_file: str)
-
Compare two graphql schema files highlighting dangerous and breaking changes.
Returns
changes (List[Change]): List of differences between both schemas with details about each change
Expand source code
def diff_from_file(schema_file: str, other_schema_file: str): """Compare two graphql schema files highlighting dangerous and breaking changes. Returns: changes (List[Change]): List of differences between both schemas with details about each change """ first = SchemaLoader.from_file(schema_file) second = SchemaLoader.from_file(other_schema_file) return Schema(first, second).diff()
def format_diff(changes: List[Change]) ‑> str
-
Format a list of changes into a printable string
Expand source code
def format_diff(changes: List[Change]) -> str: """Format a list of changes into a printable string""" changes = '\n'.join( format_change_by_criticality(change) for change in changes ) return changes or '🎉 Both schemas are equal!'
def print_diff(changes: List[Change]) ‑> NoneType
-
Pretty print a list of changes
Expand source code
def print_diff(changes: List[Change]) -> None: """Pretty print a list of changes""" print(format_diff(changes))
def validate_changes(diff: List[Change], rules: List[str], allowed_changes: Dict[str, Any] = None) ‑> ValidationResult
-
Given a list of changes between schemas and a list of rules, it runs all rules against the changes, to detect invalid changes. It also admits an allowlist of accepted invalid changes to document exceptions to the rules
Returns
bool
- True if there is at least one restricted change, False otherwise.
Expand source code
def validate_changes(diff: List[Change], rules: List[str], allowed_changes: Dict[str, Any] = None) -> ValidationResult: """Given a list of changes between schemas and a list of rules, it runs all rules against the changes, to detect invalid changes. It also admits an allowlist of accepted invalid changes to document exceptions to the rules Returns: bool: True if there is at least one restricted change, False otherwise. """ allowed_changes = allowed_changes or {} is_valid = True errors = [] rules = ValidationRule.get_subclasses_by_names(rules) for change in diff: for rule in rules: if not rule(change).is_valid(): if change.checksum() in allowed_changes: continue change.restricted = rule(change).message is_valid = False errors.append(ValidationError(rule.name, change.restricted, change)) return ValidationResult(is_valid, errors)
Classes
class Change
-
Common interface of all schema changes
This class offers the common operations and properties of all schema changes. You may use it as a type hint to get better suggestions in your editor of choice.
Expand source code
class Change(ABC): """Common interface of all schema changes This class offers the common operations and properties of all schema changes. You may use it as a type hint to get better suggestions in your editor of choice. """ criticality: Criticality = None restricted: Optional[str] = None """Descriptive message only present when a change was restricted""" @property def breaking(self) -> bool: """Is this change a breaking change?""" return self.criticality.level == CriticalityLevel.Breaking @property def dangerous(self) -> bool: """Is this a change which might break some naive api clients?""" return self.criticality.level == CriticalityLevel.Dangerous @property def safe(self) -> bool: """Is this change safe for all clients?""" return self.criticality.level == CriticalityLevel.NonBreaking @property @abstractmethod def message(self) -> str: """Formatted change message""" @property @abstractmethod def path(self) -> str: """Path to the affected schema member""" def __repr__(self) -> str: return f"Change(criticality={self.criticality!r}, message={self.message!r}, path={self.path!r})" def __str__(self) -> str: return self.message def to_dict(self) -> dict: """Get detailed representation of a change""" return { 'message': self.message, 'path': self.path, 'is_safe_change': self.safe, 'criticality': { 'level': self.criticality.level.value, 'reason': self.criticality.reason }, 'checksum': self.checksum(), } def to_json(self) -> str: """Get detailed representation of a change as a json string""" return json.dumps(self.to_dict()) def checksum(self) -> str: """Get and identifier of a change. Used for allowlisting changes""" return hashlib.md5(self.message.encode('utf-8')).hexdigest()
Ancestors
- abc.ABC
Subclasses
- FieldAbstractArgumentChange
- DirectiveChange
- EnumValueAdded
- EnumValueDeprecationReasonChanged
- EnumValueDescriptionChanged
- EnumValueRemoved
- FieldArgumentAdded
- FieldArgumentRemoved
- FieldDeprecationReasonChanged
- FieldDescriptionChanged
- FieldTypeChanged
- InputFieldAdded
- InputFieldDefaultChanged
- InputFieldDescriptionChanged
- InputFieldRemoved
- InputFieldTypeChanged
- AbstractInterfanceChange
- DroppedInterfaceImplementation
- InterfaceFieldAdded
- InterfaceFieldRemoved
- NewInterfaceImplemented
- ObjectTypeFieldAdded
- ObjectTypeFieldRemoved
- SchemaChange
- AddedType
- RemovedType
- TypeDescriptionChanged
- TypeKindChanged
- UnionMemberAdded
- UnionMemberRemoved
Class variables
var criticality : Criticality
var restricted : Optional[str]
-
Descriptive message only present when a change was restricted
Instance variables
var breaking : bool
-
Is this change a breaking change?
Expand source code
@property def breaking(self) -> bool: """Is this change a breaking change?""" return self.criticality.level == CriticalityLevel.Breaking
var dangerous : bool
-
Is this a change which might break some naive api clients?
Expand source code
@property def dangerous(self) -> bool: """Is this a change which might break some naive api clients?""" return self.criticality.level == CriticalityLevel.Dangerous
var message : str
-
Formatted change message
Expand source code
@property @abstractmethod def message(self) -> str: """Formatted change message"""
var path : str
-
Path to the affected schema member
Expand source code
@property @abstractmethod def path(self) -> str: """Path to the affected schema member"""
var safe : bool
-
Is this change safe for all clients?
Expand source code
@property def safe(self) -> bool: """Is this change safe for all clients?""" return self.criticality.level == CriticalityLevel.NonBreaking
Methods
def checksum(self) ‑> str
-
Get and identifier of a change. Used for allowlisting changes
Expand source code
def checksum(self) -> str: """Get and identifier of a change. Used for allowlisting changes""" return hashlib.md5(self.message.encode('utf-8')).hexdigest()
def to_dict(self) ‑> dict
-
Get detailed representation of a change
Expand source code
def to_dict(self) -> dict: """Get detailed representation of a change""" return { 'message': self.message, 'path': self.path, 'is_safe_change': self.safe, 'criticality': { 'level': self.criticality.level.value, 'reason': self.criticality.reason }, 'checksum': self.checksum(), }
def to_json(self) ‑> str
-
Get detailed representation of a change as a json string
Expand source code
def to_json(self) -> str: """Get detailed representation of a change as a json string""" return json.dumps(self.to_dict())