AI

Code Execution in Qwen 3.6-35B-A3B Including Multimodal Inference, Control Inference, Tool Hitting, MoE Routing, RAG, and Session Persistence

class QwenChat:
   def __init__(self, model, processor, system=None, tools=None):
       self.model, self.processor = model, processor
       self.tokenizer = processor.tokenizer
       self.history: list[dict] = []
       if system: self.history.append({"role": "system", "content": system})
       self.tools = tools


   def user(self, content):      self.history.append({"role":"user","content":content}); return self
   def assistant(self, content, reasoning=""):
       m = {"role":"assistant","content":content}
       if reasoning: m["reasoning_content"] = reasoning
       self.history.append(m); return self
   def tool_result(self, name, result):
       self.history.append({"role":"tool","name":name,
           "content": result if isinstance(result, str) else json.dumps(result)})
       return self


   def _inputs(self, enable_thinking, preserve_thinking):
       return self.processor.apply_chat_template(
           self.history, tools=self.tools, tokenize=True,
           add_generation_prompt=True, return_dict=True, return_tensors="pt",
           enable_thinking=enable_thinking, preserve_thinking=preserve_thinking,
       ).to(self.model.device)


   def generate(self, *, enable_thinking=True, preserve_thinking=False,
                max_new_tokens=2048, preset="thinking_general",
                stopping_criteria=None, append_to_history=True):
       inp = self._inputs(enable_thinking, preserve_thinking)
       cfg = SAMPLING[preset]
       gk = dict(**inp, max_new_tokens=max_new_tokens, do_sample=True,
                 temperature=cfg["temperature"], top_p=cfg["top_p"], top_k=cfg["top_k"],
                 repetition_penalty=1.0,
                 pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id)
       if stopping_criteria is not None: gk["stopping_criteria"] = stopping_criteria
       with torch.inference_mode(): out = self.model.generate(**gk)
       raw = self.tokenizer.decode(out[0, inp["input_ids"].shape[-1]:], skip_special_tokens=True)
       think, ans = split_thinking(raw)
       if append_to_history: self.assistant(ans, reasoning=think)
       return think, ans


   def stream(self, *, enable_thinking=True, preserve_thinking=False,
              max_new_tokens=2048, preset="thinking_general",
              on_thinking=None, on_answer=None):
       inp = self._inputs(enable_thinking, preserve_thinking)
       cfg = SAMPLING[preset]
       streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
       gk = dict(**inp, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True,
                 temperature=cfg["temperature"], top_p=cfg["top_p"], top_k=cfg["top_k"],
                 pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id)
       t = threading.Thread(target=self.model.generate, kwargs=gk); t.start()
       buf, in_think = "", enable_thinking
       think_text, answer_text = "", ""
       for piece in streamer:
           buf += piece
           if in_think:
               if THINK_CLOSE in buf:
                   close_at = buf.index(THINK_CLOSE)
                   resid = buf[:close_at]
                   if on_thinking: on_thinking(resid[len(think_text):])
                   think_text = resid
                   buf = buf[close_at + len(THINK_CLOSE):]
                   in_think = False
                   if buf and on_answer: on_answer(buf)
                   answer_text = buf; buf = ""
               else:
                   if on_thinking: on_thinking(piece)
                   think_text += piece
           else:
               if on_answer: on_answer(piece)
               answer_text += piece
       t.join()
       self.assistant(answer_text.strip(), reasoning=think_text.strip())
       return think_text.strip(), answer_text.strip()


   def save(self, path):
       with open(path, "w") as f:
           json.dump({"history": self.history, "tools": self.tools}, f, indent=2)
   @classmethod
   def load(cls, model, processor, path):
       with open(path) as f: data = json.load(f)
       c = cls(model, processor, tools=data.get("tools"))
       c.history = data["history"]; return c


class ThinkingBudget(StoppingCriteria):
   def __init__(self, tokenizer, budget: int):
       self.budget = budget
       self.open_ids  = tokenizer.encode(THINK_OPEN,  add_special_tokens=False)
       self.close_ids = tokenizer.encode(THINK_CLOSE, add_special_tokens=False)
       self.start = None
   def _find(self, seq, needle):
       n = len(needle)
       for i in range(len(seq)-n+1):
           if seq[i:i+n] == needle: return i
       return None
   def __call__(self, input_ids, scores, **kwargs):
       seq = input_ids[0].tolist()
       if self.start is None:
           idx = self._find(seq, self.open_ids)
           if idx is not None: self.start = idx + len(self.open_ids)
           return False
       if self._find(seq[self.start:], self.close_ids) is not None: return False
       return (len(seq) - self.start) >= self.budget


TOOL_CALL_RE = re.compile(r"s*({.*?})s*", re.S)


def run_calculate(expr: str) -> str:
   if any(c not in "0123456789+-*/().% " for c in expr):
       return json.dumps({"error":"illegal chars"})
   try:    return json.dumps({"result": eval(expr, {"__builtins__": {}}, {})})
   except Exception as e: return json.dumps({"error": str(e)})


_DOCS = {
   "qwen3.6":  "Qwen3.6-35B-A3B is a 35B MoE with 3B active params and 262k native context.",
   "deltanet": "Gated DeltaNet is a linear-attention variant used in Qwen3.6's hybrid layers.",
   "moe":      "Qwen3.6 uses 256 experts with 8 routed + 1 shared per token.",
}
def run_search_docs(q):
   hits = [v for k,v in _DOCS.items() if k in q.lower()]
   return json.dumps({"results": hits or ["no hits"]})
def run_get_time():
   import datetime as dt
   return json.dumps({"iso": dt.datetime.utcnow().isoformat()+"Z"})


TOOL_FNS = {
   "calculate":   lambda a: run_calculate(a["expression"]),
   "search_docs": lambda a: run_search_docs(a["query"]),
   "get_time":    lambda a: run_get_time(),
}
TOOLS_SCHEMA = [
   {"type":"function","function":{"name":"calculate","description":"Evaluate arithmetic.",
     "parameters":{"type":"object","properties":{"expression":{"type":"string"}},"required":["expression"]}}},
   {"type":"function","function":{"name":"search_docs","description":"Search internal docs.",
     "parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}},
   {"type":"function","function":{"name":"get_time","description":"Get current UTC time.",
     "parameters":{"type":"object","properties":{}}}},
]

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button