Source code for pydna.assembly

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2013-2023 by Björn Johansson.  All rights reserved.
# This code is part of the Python-dna distribution and governed by its
# license.  Please see the LICENSE.txt file that should have been included
# as part of this package.

"""Assembly of sequences by homologous recombination.

Should also be useful for related techniques such as Gibson assembly and fusion
PCR. Given a list of sequences (Dseqrecords), all sequences are analyzed for
shared homology longer than the set limit.

A graph is constructed where each overlapping region form a node and
sequences separating the overlapping regions form edges.


::


                -- A --
    catgatctacgtatcgtgt     -- B --
                atcgtgtactgtcatattc
                            catattcaaagttct



    --x--> A --y--> B --z-->   (Graph)

    Nodes:

    A : atcgtgt
    B : catattc

    Edges:

    x : catgatctacgt
    y : actgt
    z : aaagttct


The NetworkX package is used to trace linear and circular paths through the
graph.
"""
import os as _os
from Bio.SeqFeature import SeqFeature as _SeqFeature
from Bio.SeqFeature import ExactPosition as _ExactPosition
from Bio.SeqFeature import SimpleLocation as _SimpleLocation
from Bio.SeqFeature import CompoundLocation as _CompoundLocation
from pydna.utils import rc as _rc

# from pydna.utils import memorize as _memorize
from pydna._pretty import pretty_str as _pretty_str
from pydna.contig import Contig as _Contig
from pydna.common_sub_strings import common_sub_strings, Match as _Match

from pydna.dseqrecord import Dseqrecord as _Dseqrecord
import networkx as _nx

from copy import deepcopy as _deepcopy
from typing import (
    Callable as _Callable,
    Dict as _Dict,
    List as _List,
    NamedTuple as _NamedTuple,
    TypedDict as _TypedDict,
)
import itertools as _itertools
import logging as _logging

# from func_timeout import func_set_timeout
# from wrapt_timeout_decorator import timeout
from pydna.threading_timer_decorator_exit import exit_after

_module_logger = _logging.getLogger("pydna." + __name__)


[docs]class Assembly(object): # , metaclass=_Memoize): """Assembly of a list of linear DNA fragments into linear or circular constructs. The Assembly is meant to replace the Assembly method as it is easier to use. Accepts a list of Dseqrecords (source fragments) to initiate an Assembly object. Several methods are available for analysis of overlapping sequences, graph construction and assembly. Parameters ---------- fragments : list a list of Dseqrecord objects. limit : int, optional The shortest shared homology to be considered algorithm : function, optional The algorithm used to determine the shared sequences. max_nodes : int The maximum number of nodes in the graph. This can be tweaked to manage sequences with a high number of shared sub sequences. Examples -------- >>> from pydna.assembly import Assembly >>> from pydna.dseqrecord import Dseqrecord >>> a = Dseqrecord("acgatgctatactgCCCCCtgtgctgtgctcta") >>> b = Dseqrecord("tgtgctgtgctctaTTTTTtattctggctgtatc") >>> c = Dseqrecord("tattctggctgtatcGGGGGtacgatgctatactg") >>> x = Assembly((a,b,c), limit=14) >>> x Assembly fragments....: 33bp 34bp 35bp limit(bp)....: 14 G.nodes......: 6 algorithm....: common_sub_strings >>> x.assemble_circular() [Contig(o59), Contig(o59)] >>> x.assemble_circular()[0].seq.watson 'acgatgctatactgCCCCCtgtgctgtgctctaTTTTTtattctggctgtatcGGGGGt' """ def __init__( self, frags: _List[_Dseqrecord], limit: int = 25, algorithm: _Callable[[str, str, int], _List[_Match]] = common_sub_strings, ) -> None: # Fragments is a string subclass with some extra properties # The order of the fragments has significance fragments: _List[_FragmentDict] = [ { "upper": str(f.seq).upper(), "mixed": str(f.seq), "name": f.name, "features": f.features, "nodes": [], } for f in frags ] # rcfragments is a dict with fragments as keys and the reverse # complement as value rcfragments: _Dict[str, _FragmentDict] = { f["mixed"]: { "upper": str(frc.seq).upper(), "mixed": str(frc.seq), "name": frc.name, "features": frc.features, "nodes": [], } for f, frc in zip(fragments, (f.rc() for f in frags)) } # The nodemap dict holds nodes and their reverse complements nodemap = { "begin": "end", "end": "begin", "begin_rc": "end_rc", "end_rc": "begin_rc", } # all combinations of fragments are compared. # see https://docs.python.org/3.10/library/itertools.html # itertools.combinations('ABCD', 2)--> AB AC AD BC BD CD for first, secnd in _itertools.combinations(fragments, 2): if first["upper"] == secnd["upper"]: continue firrc = rcfragments[first["mixed"]] secrc = rcfragments[secnd["mixed"]] # matches is a list of tuples of three integers describing # overlapping sequences: # (start position in first, start position in secnd, length) # This comparison is done using uppercase strings, see _ # Fragment class matches = algorithm(first["upper"], secnd["upper"], limit) for start_in_first, start_in_secnd, length in matches: # node is a string and represent the shared sequence in upper # case. node = first["upper"][start_in_first : start_in_first + length] first["nodes"].append(_NodeTuple(start_in_first, length, node)) secnd["nodes"].append(_NodeTuple(start_in_secnd, length, node)) # The same node exists between the reverse complements of # first and secnd # The new positions are calculated from the length of the # fragment and # the overlapping sequence start_in_firrc = len(first["upper"]) - start_in_first - length start_in_secrc = len(secnd["upper"]) - start_in_secnd - length # noderc is the reverse complement of node noderc = firrc["upper"][start_in_firrc : start_in_firrc + length] firrc["nodes"].append(_NodeTuple(start_in_firrc, length, noderc)) secrc["nodes"].append(_NodeTuple(start_in_secrc, length, noderc)) nodemap[node] = noderc # first is also compared to the rc of secnd matches = algorithm(first["upper"], secrc["upper"], limit) for start_in_first, start_in_secrc, length in matches: node = first["upper"][start_in_first : start_in_first + length] first["nodes"].append(_NodeTuple(start_in_first, length, node)) secrc["nodes"].append(_NodeTuple(start_in_secrc, length, node)) start_in_firrc, start_in_secnd = ( len(first["upper"]) - start_in_first - length, len(secnd["upper"]) - start_in_secrc - length, ) noderc = firrc["upper"][start_in_firrc : start_in_firrc + length] firrc["nodes"].append(_NodeTuple(start_in_firrc, length, noderc)) secnd["nodes"].append(_NodeTuple(start_in_secnd, length, noderc)) nodemap[node] = noderc # A directed graph class that can store multiedges. # Multiedges are multiple edges between two nodes. Each edge can hold # optional data or attributes. # https://networkx.github.io/documentation/stable/reference/classes/ # multidigraph.html order = 0 G = _nx.MultiDiGraph() # loop through all fragments their and reverse complements for f in fragments: f["nodes"] = sorted(set(f["nodes"])) for f in rcfragments.values(): f["nodes"] = sorted(set(f["nodes"])) for f in _itertools.chain(fragments, rcfragments.values()): # nodes are sorted in place in the order of their position # duplicates are removed (same position and sequence) # along the fragment since nodes are a tuple (position(int), # sequence(str)) before = G.order() G.add_nodes_from( (node, {"order": order + od, "length": length}) for od, (start, length, node) in enumerate(n for n in f["nodes"] if n[2] not in G) ) order += G.order() - before for (start1, length1, node1), ( start2, length2, node2, ) in _itertools.combinations(f["nodes"], 2): feats = [ ft for ft in f["features"] if start1 <= ft.location.start and start2 + G.nodes[node2]["length"] >= ft.location.end ] # for feat in feats: # feat.location += -start1 G.add_edge( node1, node2, # nodes (strings) piece=slice(start1, start2), # slice features=feats, # features seq=f["mixed"], # mixed case string name=f["name"], ) # string self.G = _nx.create_empty_copy(G) self.G.add_edges_from(sorted(G.edges(data=True), key=lambda t: len(t[2].get("seq", 1)), reverse=True)) self.nodemap = {**nodemap, **{nodemap[i]: i for i in nodemap}} self.limit = limit self.fragments = fragments self.rcfragments = rcfragments self.algorithm = algorithm @exit_after(int(_os.getenv("pydna_assembly_limit", 10))) def assemble_linear(self, start=None, end=None, max_nodes=None): G = _nx.MultiDiGraph(self.G) G.add_nodes_from(["begin", "begin_rc", "end", "end_rc"], length=0) # add edges from "begin" to nodes in the first # sequence in self.fragments firstfragment = self.fragments[0] for start, length, node in firstfragment["nodes"][::-1]: G.add_edge( "begin", node, piece=slice(0, start), features=[f for f in firstfragment["features"] if start + length >= f.location.end], seq=firstfragment["mixed"], name=firstfragment["name"], ) # add edges from "begin_rc" to nodes in the reverse complement of the # first sequence firstfragmentrc = self.rcfragments[firstfragment["mixed"]] for start, length, node in firstfragmentrc["nodes"][::-1]: G.add_edge( "begin_rc", node, piece=slice(0, start), features=[f for f in firstfragmentrc["features"] if start + length >= f.location.end], seq=firstfragmentrc["mixed"], name=firstfragmentrc["name"], ) # add edges from nodes in last sequence to "end" lastfragment = self.fragments[-1] for start, length, node in lastfragment["nodes"]: G.add_edge( node, "end", piece=slice(start, len(lastfragment["mixed"])), features=[f for f in lastfragment["features"] if start <= f.location.start], seq=lastfragment["mixed"], name=lastfragment["name"], ) # breakpoint() # add edges from nodes in last reverse complement sequence to "end_rc" lastfragmentrc = self.rcfragments[lastfragment["mixed"]] for start, length, node in lastfragmentrc["nodes"]: G.add_edge( node, "end_rc", piece=slice(start, len(lastfragmentrc["mixed"])), features=[f for f in lastfragmentrc["features"] if start <= f.location.start], seq=lastfragmentrc["mixed"], name=lastfragmentrc["name"], ) max_nodes = max_nodes or len(self.fragments) linearpaths = list( _itertools.chain( _nx.all_simple_paths(_nx.DiGraph(G), "begin", "end", cutoff=max_nodes), _nx.all_simple_paths(_nx.DiGraph(G), "begin", "end_rc", cutoff=max_nodes), _nx.all_simple_paths(_nx.DiGraph(G), "begin_rc", "end", cutoff=max_nodes), _nx.all_simple_paths(_nx.DiGraph(G), "begin_rc", "end_rc", cutoff=max_nodes), ) ) lps = {} for lp in linearpaths: edgelol = [] for u, v in zip(lp, lp[1:]): e = [] for d in G[u][v].values(): e.append((u, v, d)) edgelol.append(e) for edges in _itertools.product(*edgelol): # TODO explain if [ True for ((u, v, e), (x, y, z)) in zip(edges, edges[1:]) if ((e["seq"], e["piece"].stop) == (z["seq"], z["piece"].start)) ]: continue ct = "".join(e["seq"][e["piece"]] for u, v, e in edges) key = ct.upper() if key in lps: continue # TODO: is this test needed? sg = _nx.DiGraph() sg.add_edges_from(edges) sg.add_nodes_from((n, d) for n, d in G.nodes(data=True) if n in lp) edgefeatures = [] offset = 0 for u, v, e in edges: feats = _deepcopy(e["features"]) for f in feats: f.location += offset - e["piece"].start edgefeatures.extend(feats) offset += e["piece"].stop - e["piece"].start lps[key] = ct, edgefeatures, sg, {n: self.nodemap[n] for n in lp} return sorted( ( _Contig.from_string( lp[0], features=lp[1], graph=lp[2], nodemap=lp[3], linear=True, circular=False, ) for lp in lps.values() ), key=len, reverse=True, ) @exit_after(int(_os.getenv("pydna_assembly_limit", 10))) def assemble_circular(self, length_bound=None): cps = {} # circular assembly cpsrc = {} cpaths = sorted(_nx.simple_cycles(self.G, length_bound=length_bound), key=len) cpaths_sorted = [] for cpath in cpaths: order, node = min((self.G.nodes[node]["order"], node) for node in cpath) i = cpath.index(node) cpaths_sorted.append((order, cpath[i:] + cpath[:i])) cpaths_sorted.sort() for ( _, cp, ) in cpaths_sorted: # cpaths is a list of nodes representing a circular assembly edgelol = [] # edgelol is a list of lists of all edges along cp cp += cp[0:1] for u, v in zip(cp, cp[1:]): e = [] for d in self.G[u][v].values(): e.append((u, v, d)) edgelol.append(e) for edges in _itertools.product(*edgelol): if [ True for ((u, v, e), (x, y, z)) in zip(edges, edges[1:]) if ((e["seq"], e["piece"].stop) == (z["seq"], z["piece"].start)) ]: continue ct = "".join(e["seq"][e["piece"]] for u, v, e in edges) key = ct.upper() if key in cps or key in cpsrc: continue # TODO: is test in cpsrc needed? sg = _nx.DiGraph() sg.add_edges_from(edges) sg.add_nodes_from((n, d) for n, d in self.G.nodes(data=True) if n in cp) edgefeatures = [] offset = 0 for u, v, e in edges: feats = _deepcopy(e["features"]) for feat in feats: feat.location += offset edgefeatures.extend(feats) offset += e["piece"].stop - e["piece"].start for f in edgefeatures: if f.location.start > len(ct) and f.location.end > len(ct): f.location += -len(ct) elif f.location.end > len(ct): f.location = _CompoundLocation( ( _SimpleLocation(f.location.start, _ExactPosition(len(ct))), _SimpleLocation(_ExactPosition(0), f.location.end - len(ct)), ) ) cps[key] = cpsrc[_rc(key)] = ( ct, edgefeatures, sg, {n: self.nodemap[n] for n in cp[:-1]}, cp, ) return sorted( ( _Contig.from_string( cp[0], features=cp[1], graph=cp[2], nodemap=cp[3], linear=False, circular=True, ) for cp in cps.values() ), key=len, reverse=True, ) def __repr__(self) -> _pretty_str: # https://pyformat.info return _pretty_str( "Assembly\n" "fragments..: {sequences}\n" "limit(bp)..: {limit}\n" "G.nodes....: {nodes}\n" "algorithm..: {al}".format( sequences=" ".join("{}bp".format(len(x["mixed"])) for x in self.fragments), limit=self.limit, nodes=self.G.order(), al=self.algorithm.__name__, ) )
example_fragments = ( _Dseqrecord("AacgatCAtgctcc", name="a"), _Dseqrecord("TtgctccTAAattctgc", name="b"), _Dseqrecord("CattctgcGAGGacgatG", name="c"), ) linear_results = ( _Dseqrecord("AacgatCAtgctccTAAattctgcGAGGacgatG", name="abc"), _Dseqrecord("ggagcaTGatcgtCCTCgcagaatG", name="ac_rc"), _Dseqrecord("AacgatG", name="ac"), ) circular_results = ( _Dseqrecord("acgatCAtgctccTAAattctgcGAGG", name="abc", circular=True), _Dseqrecord("ggagcaTGatcgtCCTCgcagaatTTA", name="abc_rc", circular=True), ) class _NodeTuple(_NamedTuple): start: int length: int shared_seq: str # uppercase class _FragmentDict(_TypedDict): upper: str mixed: str name: str features: _List[_SeqFeature] nodes: _List[_NodeTuple] if __name__ == "__main__": import os as _os cached = _os.getenv("pydna_cached_funcs", "") _os.environ["pydna_cached_funcs"] = "" import doctest doctest.testmod(verbose=True, optionflags=doctest.ELLIPSIS) _os.environ["pydna_cached_funcs"] = cached