Discussion Forum

Insert Stream of Data

Good evening, everyone.

I am currently facing the problem of importing about 2 TB of Neo4j connections into Grakn via python. And I would like to do this while I am still alive :slight_smile:

To realize this, I get a constant stream from Neo4J. Of course the read-strem is much faster than the write-access, that’s why I work with multithreading. (Threadpool with about 200 workers).

Each of these objects consists of two entities, each with a unique key and a connection that connects object A and object B.

So to write them in Grakn, I have to do the following for each object from the stream:

  1. check if A already exists
  2. check whether B already exists
  3. if A and B exist, check whether there is a connection between A and B with the same content
  4. create A, B and the connection when it does not exist yet

Since I have to create a new transaction for each of these items, writing them takes an incredible amount of time. Currently already 10 minutes for 10000 new objects.

Is there a way to write this stream much faster into GRAKN via python?

Thanks for your answer!

Code
# check if object exists already
def get_business_object(session, type_,id_):
     with session.transaction().read() as read_transaction:
            answer_iterator = read_transaction.query(f'''
            match $A isa {type_}, has {type_.lower()}-id {json.dumps(id_)};   
            get $A; offset 0; limit 1;
            ''')
            for res in answer_iterator:
                return res.map().get("A").id

# check if relation exists already
def check_if_relation_exists(session, id_A, type_A, id_B, type_B, timebucket):
     with session.transaction().read() as read_transaction:
            answer_iterator = read_transaction.query(f'''
            match $A isa {type_A}; $A id {id_A}; 
            $B isa {type_B}; $B id {id_B};
            $cont (source:$A, {relation_mapper(type_B)}:$B) isa ginni_correlation, has timebucket {timebucket};
            get $cont; offset 0; limit 1;
            ''')
            for res in answer_iterator:
                return res.map().get("cont").id

def create_grakn_object_function(res):

    # prepare data:
    A = res.get("A.key").split(":")
    B = res.get("B.key").split(":")
    weight = res.get("r.weight")
    timebucket = res.get("r.key")
    counter_A = res.get("con_1.counter")
    counter_B = res.get("con_2.counter")
    
    #check if data exists in grakn:
    prev_A = get_business_object(session, A[0].upper(), A[1])
    prev_B = get_business_object(session, B[0].upper(), B[1])

    # check if ginni_correlation already exists, in case of A and B existing: 
    if prev_A and prev_B:
        correlation_exists = check_if_relation_exists(session = session, 
                                                id_A = prev_A, 
                                                type_A = A[0].upper(), 
                                                id_B = prev_B,
                                                type_B = B[0].upper(), 
                                                timebucket = timebucket)
        if correlation_exists:
            #skipp writing:
            return 0
    
    # match:
    cmd = ""
    if prev_A is not None or prev_B is not None:
        cmd += "match "
        if prev_A is not None:
            cmd += "$A isa %s; $A id %s;" % (A[0].upper(), prev_A)
        if prev_B is not None:
            cmd += "$B isa %s; $B id %s;" % (B[0].upper(), prev_B) 
            
    # insert:
    cmd += "insert "
    
    if prev_A is None:
        cmd += f"$A isa {A[0].upper()}, has {A[0].lower()}-id {json.dumps(A[1])};"
    if prev_B is None:
        cmd += f"$B isa {B[0].upper()}, has {B[0].lower()}-id {json.dumps(B[1])};"
        
    # create correlation
    cmd += f"$cont (source:$A, {relation_mapper(B[0])}:$B) isa ginni_correlation, has weight {weight}, has timebucket {timebucket};"
    
    # add timeframe
    cmd += f"$A has timebucked-counter-%s {counter_A};" % str(timebucket).replace(".","_")
    cmd += f"$B has timebucked-counter-%s {counter_B};" % str(timebucket).replace(".","_")
    
    
    # write to db:
    try:
        with session.transaction().write() as write_transaction:
            write_transaction.query(cmd)
            write_transaction.commit()
            counter+=1
        return 0
    except exception.GraknError.GraknError as error:
        # retry if it fails after n seconds
        random_timer = random.uniform(.1, .5)
        time.sleep(random_timer)
        
        idle_threads+=1
        create_grakn_object(res)
    
    return 0


with GraknClient(uri="****:***") as client:
    with client.session(keyspace="test") as session:
        with ThreadPool(processes=128) as pool:
        
            for r in pool.imap_unordered(create_grakn_object_function, neo4j_stream, chunksize=32):
                pbar.update(1)

There’s no need to use a new transaction per query! That will come with a time cost. Instead, you can make up to about 500 queries per transaction before committing/closing it. That should give you a speedup!